2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.CompletionException;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34 import java.util.function.BiConsumer;
35 import java.util.function.Function;
36 import java.util.function.Supplier;
37 import java.util.function.UnaryOperator;
38 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
39 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
40 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
41 import org.onap.policy.common.utils.coder.Coder;
42 import org.onap.policy.common.utils.coder.CoderException;
43 import org.onap.policy.common.utils.coder.StandardCoder;
44 import org.onap.policy.controlloop.ControlLoopOperation;
45 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
46 import org.onap.policy.controlloop.actorserviceprovider.Operation;
47 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
48 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
49 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
50 import org.onap.policy.controlloop.policy.PolicyResult;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
55 * Partial implementation of an operator. In general, it's preferable that subclasses
56 * would override {@link #startOperationAsync(int, OperationOutcome)
57 * startOperationAsync()}. However, if that proves to be too difficult, then they can
58 * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
59 * if the operation requires any preprocessor steps, the subclass may choose to override
60 * {@link #startPreprocessorAsync()}.
62 * The futures returned by the methods within this class can be canceled, and will
63 * propagate the cancellation to any subtasks. Thus it is also expected that any futures
64 * returned by overridden methods will do the same. Of course, if a class overrides
65 * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
66 * be done to cancel that particular operation.
68 public abstract class OperationPartial implements Operation {
69 private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
70 private static final Coder coder = new StandardCoder();
71 public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
73 // values extracted from the operator
75 private final OperatorPartial operator;
78 * Operation parameters.
80 protected final ControlLoopOperationParams params;
84 * Constructs the object.
86 * @param params operation parameters
87 * @param operator operator that created this operation
89 public OperationPartial(ControlLoopOperationParams params, OperatorPartial operator) {
91 this.operator = operator;
94 public Executor getBlockingExecutor() {
95 return operator.getBlockingExecutor();
98 public String getFullName() {
99 return operator.getFullName();
102 public String getActorName() {
103 return operator.getActorName();
106 public String getName() {
107 return operator.getName();
111 public final CompletableFuture<OperationOutcome> start() {
112 if (!operator.isAlive()) {
113 throw new IllegalStateException("operation is not running: " + getFullName());
116 // allocate a controller for the entire operation
117 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
119 CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
120 if (preproc == null) {
121 // no preprocessor required - just start the operation
122 return startOperationAttempt(controller, 1);
126 * Do preprocessor first and then, if successful, start the operation. Note:
127 * operations create their own outcome, ignoring the outcome from any previous
130 * Wrap the preprocessor to ensure "stop" is propagated to it.
133 controller.wrap(preproc)
134 .exceptionally(fromException("preprocessor of operation"))
135 .thenCompose(handlePreprocessorFailure(controller))
136 .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
137 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
144 * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
145 * invokes the call-backs, marks the controller complete, and returns an incomplete
146 * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
149 * Assumes that no callbacks have been invoked yet.
151 * @param controller pipeline controller
152 * @return a function that checks the outcome status and continues, if successful, or
153 * indicates a failure otherwise
155 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
156 PipelineControllerFuture<OperationOutcome> controller) {
160 if (outcome != null && isSuccess(outcome)) {
161 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
162 return CompletableFuture.completedFuture(outcome);
165 logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
167 final Executor executor = params.getExecutor();
168 final CallbackManager callbacks = new CallbackManager();
170 // propagate "stop" to the callbacks
171 controller.add(callbacks);
173 final OperationOutcome outcome2 = params.makeOutcome();
175 // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
177 outcome2.setResult(PolicyResult.FAILURE_GUARD);
178 outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
181 CompletableFuture.completedFuture(outcome2)
182 .whenCompleteAsync(callbackStarted(callbacks), executor)
183 .whenCompleteAsync(callbackCompleted(callbacks), executor)
184 .whenCompleteAsync(controller.delayedComplete(), executor);
187 return new CompletableFuture<>();
192 * Invokes the operation's preprocessor step(s) as a "future". This method simply
193 * invokes {@link #startGuardAsync()}.
195 * This method assumes the following:
197 * <li>the operator is alive</li>
198 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
201 * @return a function that will start the preprocessor and returns its outcome, or
202 * {@code null} if this operation needs no preprocessor
204 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
205 return startGuardAsync();
209 * Invokes the operation's guard step(s) as a "future". This method simply returns
212 * This method assumes the following:
214 * <li>the operator is alive</li>
215 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
218 * @return a function that will start the guard checks and returns its outcome, or
219 * {@code null} if this operation has no guard
221 protected CompletableFuture<OperationOutcome> startGuardAsync() {
226 * Starts the operation attempt, with no preprocessor. When all retries complete, it
227 * will complete the controller.
229 * @param controller controller for all operation attempts
230 * @param attempt attempt number, typically starting with 1
231 * @return a future that will return the final result of all attempts
233 private CompletableFuture<OperationOutcome> startOperationAttempt(
234 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
236 // propagate "stop" to the operation attempt
237 controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
238 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
244 * Starts the operation attempt, without doing any retries.
246 * @param params operation parameters
247 * @param attempt attempt number, typically starting with 1
248 * @return a future that will return the result of a single operation attempt
250 private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
252 logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
254 final Executor executor = params.getExecutor();
255 final OperationOutcome outcome = params.makeOutcome();
256 final CallbackManager callbacks = new CallbackManager();
258 // this operation attempt gets its own controller
259 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
261 // propagate "stop" to the callbacks
262 controller.add(callbacks);
265 CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
266 .whenCompleteAsync(callbackStarted(callbacks), executor)
267 .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
270 // handle timeouts, if specified
271 long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
272 if (timeoutMillis > 0) {
273 logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
274 future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
278 * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
279 * before callbackCompleted() is invoked.
281 * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
282 * the pipeline as the last step anyway.
286 future.exceptionally(fromException("operation"))
287 .thenApply(setRetryFlag(attempt))
288 .whenCompleteAsync(callbackStarted(callbacks), executor)
289 .whenCompleteAsync(callbackCompleted(callbacks), executor)
290 .whenCompleteAsync(controller.delayedComplete(), executor);
297 * Determines if the outcome was successful.
299 * @param outcome outcome to examine
300 * @return {@code true} if the outcome was successful
302 protected boolean isSuccess(OperationOutcome outcome) {
303 return (outcome.getResult() == PolicyResult.SUCCESS);
307 * Determines if the outcome was a failure for this operator.
309 * @param outcome outcome to examine, or {@code null}
310 * @return {@code true} if the outcome is not {@code null} and was a failure
311 * <i>and</i> was associated with this operator, {@code false} otherwise
313 protected boolean isActorFailed(OperationOutcome outcome) {
314 return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
318 * Determines if the given outcome is for this operation.
320 * @param outcome outcome to examine
321 * @return {@code true} if the outcome is for this operation, {@code false} otherwise
323 protected boolean isSameOperation(OperationOutcome outcome) {
324 return OperationOutcome.isFor(outcome, getActorName(), getName());
328 * Invokes the operation as a "future". This method simply invokes
329 * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
330 * returning the result via a "future".
332 * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
333 * the executor in the "params", as that may bring the background thread pool to a
334 * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
337 * This method assumes the following:
339 * <li>the operator is alive</li>
340 * <li>verifyRunning() has been invoked</li>
341 * <li>callbackStarted() has been invoked</li>
342 * <li>the invoker will perform appropriate timeout checks</li>
343 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
346 * @param attempt attempt number, typically starting with 1
347 * @return a function that will start the operation and return its result when
350 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
352 return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
356 * Low-level method that performs the operation. This can make the same assumptions
357 * that are made by {@link #doOperationAsFuture()}. This particular method simply
358 * throws an {@link UnsupportedOperationException}.
360 * @param attempt attempt number, typically starting with 1
361 * @param operation the operation being performed
362 * @return the outcome of the operation
364 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
366 throw new UnsupportedOperationException("start operation " + getFullName());
370 * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
371 * FAILURE, assuming the policy specifies retries and the retry count has been
374 * @param attempt latest attempt number, starting with 1
375 * @return a function to get the next future to execute
377 private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
379 return operation -> {
380 if (operation != null && !isActorFailed(operation)) {
382 * wrong type or wrong operation - just leave it as is. No need to log
383 * anything here, as retryOnFailure() will log a message
388 // get a non-null operation
389 OperationOutcome oper2;
390 if (operation != null) {
393 oper2 = params.makeOutcome();
394 oper2.setResult(PolicyResult.FAILURE);
397 int retry = getRetry(params.getRetry());
398 if (retry > 0 && attempt > retry) {
400 * retries were specified and we've already tried them all - change to
403 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
404 oper2.setResult(PolicyResult.FAILURE_RETRIES);
412 * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
413 * was previously invoked, and thus that the "operation" is not {@code null}.
415 * @param controller controller for all of the retries
416 * @param attempt latest attempt number, starting with 1
417 * @return a function to get the next future to execute
419 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
420 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
422 return operation -> {
423 if (!isActorFailed(operation)) {
424 // wrong type or wrong operation - just leave it as is
425 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
426 controller.complete(operation);
427 return new CompletableFuture<>();
430 if (getRetry(params.getRetry()) <= 0) {
431 // no retries - already marked as FAILURE, so just return it
432 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
433 controller.complete(operation);
434 return new CompletableFuture<>();
438 * Retry the operation.
440 long waitMs = getRetryWaitMs();
441 logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
443 return sleep(waitMs, TimeUnit.MILLISECONDS)
444 .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
449 * Convenience method that starts a sleep(), running via a future.
451 * @param sleepTime time to sleep
452 * @param unit time unit
453 * @return a future that will complete when the sleep completes
455 protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
456 if (sleepTime <= 0) {
457 return CompletableFuture.completedFuture(null);
460 return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
464 * Converts an exception into an operation outcome, returning a copy of the outcome to
465 * prevent background jobs from changing it.
467 * @param type type of item throwing the exception
468 * @return a function that will convert an exception into an operation outcome
470 private Function<Throwable, OperationOutcome> fromException(String type) {
473 OperationOutcome outcome = params.makeOutcome();
475 logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
476 params.getRequestId(), thrown);
478 return setOutcome(outcome, thrown);
483 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
484 * any outstanding futures when one completes.
486 * @param futureMakers function to make a future. If the function returns
487 * {@code null}, then no future is created for that function. On the other
488 * hand, if the function throws an exception, then the previously created
489 * functions are canceled and the exception is re-thrown
490 * @return a future to cancel or await an outcome, or {@code null} if no futures were
491 * created. If this future is canceled, then all of the futures will be
494 protected CompletableFuture<OperationOutcome> anyOf(
495 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
497 return anyOf(Arrays.asList(futureMakers));
501 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
502 * any outstanding futures when one completes.
504 * @param futureMakers function to make a future. If the function returns
505 * {@code null}, then no future is created for that function. On the other
506 * hand, if the function throws an exception, then the previously created
507 * functions are canceled and the exception is re-thrown
508 * @return a future to cancel or await an outcome, or {@code null} if no futures were
509 * created. If this future is canceled, then all of the futures will be
510 * canceled. Similarly, when this future completes, any incomplete futures
513 protected CompletableFuture<OperationOutcome> anyOf(
514 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
516 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
518 CompletableFuture<OperationOutcome>[] futures =
519 attachFutures(controller, futureMakers, UnaryOperator.identity());
521 if (futures.length == 0) {
522 // no futures were started
526 if (futures.length == 1) {
530 CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
531 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
537 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
539 * @param futureMakers function to make a future. If the function returns
540 * {@code null}, then no future is created for that function. On the other
541 * hand, if the function throws an exception, then the previously created
542 * functions are canceled and the exception is re-thrown
543 * @return a future to cancel or await an outcome, or {@code null} if no futures were
544 * created. If this future is canceled, then all of the futures will be
547 protected CompletableFuture<OperationOutcome> allOf(
548 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
550 return allOf(Arrays.asList(futureMakers));
554 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
556 * @param futureMakers function to make a future. If the function returns
557 * {@code null}, then no future is created for that function. On the other
558 * hand, if the function throws an exception, then the previously created
559 * functions are canceled and the exception is re-thrown
560 * @return a future to cancel or await an outcome, or {@code null} if no futures were
561 * created. If this future is canceled, then all of the futures will be
562 * canceled. Similarly, when this future completes, any incomplete futures
565 protected CompletableFuture<OperationOutcome> allOf(
566 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
567 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
569 Queue<OperationOutcome> outcomes = new LinkedList<>();
571 CompletableFuture<OperationOutcome>[] futures =
572 attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
573 synchronized (outcomes) {
574 outcomes.add(outcome);
579 if (futures.length == 0) {
580 // no futures were started
584 if (futures.length == 1) {
589 CompletableFuture.allOf(futures)
590 .thenApply(unused -> combineOutcomes(outcomes))
591 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
598 * Invokes the functions to create the futures and attaches them to the controller.
600 * @param controller master controller for all of the futures
601 * @param futureMakers futures to be attached to the controller
602 * @param adorn function that "adorns" the future, possible adding onto its pipeline.
603 * Returns the adorned future
604 * @return an array of futures, possibly zero-length. If the array is of size one,
605 * then that one item should be returned instead of the controller
607 private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
608 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
609 UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
611 if (futureMakers.isEmpty()) {
612 @SuppressWarnings("unchecked")
613 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
617 // the last, unadorned future that is created
618 CompletableFuture<OperationOutcome> lastFuture = null;
620 List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
623 for (var maker : futureMakers) {
625 CompletableFuture<OperationOutcome> future = maker.get();
626 if (future == null) {
630 // propagate "stop" to the future
631 controller.add(future);
633 futures.add(adorn.apply(future));
637 } catch (RuntimeException e) {
638 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
639 controller.cancel(false);
644 @SuppressWarnings("unchecked")
645 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
647 if (result.length == 1) {
648 // special case - return the unadorned future
649 result[0] = lastFuture;
653 return futures.toArray(result);
657 * Combines the outcomes from a set of tasks.
659 * @param outcomes outcomes to be examined
660 * @return the combined outcome
662 private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
664 // identify the outcome with the highest priority
665 OperationOutcome outcome = outcomes.remove();
666 int priority = detmPriority(outcome);
668 for (OperationOutcome outcome2 : outcomes) {
669 int priority2 = detmPriority(outcome2);
671 if (priority2 > priority) {
673 priority = priority2;
677 logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
678 (outcome == null ? null : outcome.getResult()), params.getRequestId());
684 * Determines the priority of an outcome based on its result.
686 * @param outcome outcome to examine, or {@code null}
687 * @return the outcome's priority
689 protected int detmPriority(OperationOutcome outcome) {
690 if (outcome == null || outcome.getResult() == null) {
694 switch (outcome.getResult()) {
701 case FAILURE_RETRIES:
707 case FAILURE_TIMEOUT:
710 case FAILURE_EXCEPTION:
717 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
718 * not created until the previous task completes. The pipeline returns the outcome of
719 * the last task executed.
721 * @param futureMakers functions to make the futures
722 * @return a future to cancel the sequence or await the outcome
724 protected CompletableFuture<OperationOutcome> sequence(
725 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
727 return sequence(Arrays.asList(futureMakers));
731 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
732 * not created until the previous task completes. The pipeline returns the outcome of
733 * the last task executed.
735 * @param futureMakers functions to make the futures
736 * @return a future to cancel the sequence or await the outcome, or {@code null} if
737 * there were no tasks to perform
739 protected CompletableFuture<OperationOutcome> sequence(
740 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
742 Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
744 CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
745 if (nextTask == null) {
750 if (queue.isEmpty()) {
751 // only one task - just return it rather than wrapping it in a controller
756 * multiple tasks - need a controller to stop whichever task is currently
759 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
760 final Executor executor = params.getExecutor();
763 controller.wrap(nextTask)
764 .thenComposeAsync(nextTaskOnSuccess(controller, queue), executor)
765 .whenCompleteAsync(controller.delayedComplete(), executor);
772 * Executes the next task in the queue, if the previous outcome was successful.
774 * @param controller pipeline controller
775 * @param taskQueue queue of tasks to be performed
776 * @return a future to execute the remaining tasks, or the current outcome, if it's a
777 * failure, or if there are no more tasks
779 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
780 PipelineControllerFuture<OperationOutcome> controller,
781 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
784 if (!isSuccess(outcome)) {
785 // return the failure
786 return CompletableFuture.completedFuture(outcome);
789 CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
790 if (nextTask == null) {
791 // no tasks - just return the success
792 return CompletableFuture.completedFuture(outcome);
798 .thenComposeAsync(nextTaskOnSuccess(controller, taskQueue), params.getExecutor());
804 * Gets the next task from the queue, skipping those that are {@code null}.
806 * @param taskQueue task queue
807 * @return the next task, or {@code null} if the queue is now empty
809 private CompletableFuture<OperationOutcome> getNextTask(
810 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
812 Supplier<CompletableFuture<OperationOutcome>> maker;
814 while ((maker = taskQueue.poll()) != null) {
815 CompletableFuture<OperationOutcome> future = maker.get();
816 if (future != null) {
825 * Sets the start time of the operation and invokes the callback to indicate that the
826 * operation has started. Does nothing if the pipeline has been stopped.
828 * This assumes that the "outcome" is not {@code null}.
830 * @param callbacks used to determine if the start callback can be invoked
831 * @return a function that sets the start time and invokes the callback
833 private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
835 return (outcome, thrown) -> {
837 if (callbacks.canStart()) {
838 // haven't invoked "start" callback yet
839 outcome.setStart(callbacks.getStartTime());
840 outcome.setEnd(null);
841 params.callbackStarted(outcome);
847 * Sets the end time of the operation and invokes the callback to indicate that the
848 * operation has completed. Does nothing if the pipeline has been stopped.
850 * This assumes that the "outcome" is not {@code null}.
852 * Note: the start time must be a reference rather than a plain value, because it's
853 * value must be gotten on-demand, when the returned function is executed at a later
856 * @param callbacks used to determine if the end callback can be invoked
857 * @return a function that sets the end time and invokes the callback
859 private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
861 return (outcome, thrown) -> {
863 if (callbacks.canEnd()) {
864 outcome.setStart(callbacks.getStartTime());
865 outcome.setEnd(callbacks.getEndTime());
866 params.callbackCompleted(outcome);
872 * Sets an operation's outcome and message, based on a throwable.
874 * @param operation operation to be updated
875 * @return the updated operation
877 protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
878 PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
879 return setOutcome(operation, result);
883 * Sets an operation's outcome and default message based on the result.
885 * @param operation operation to be updated
886 * @param result result of the operation
887 * @return the updated operation
889 public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
890 logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
891 operation.setResult(result);
892 operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
893 : ControlLoopOperation.FAILED_MSG);
899 * Determines if a throwable is due to a timeout.
901 * @param thrown throwable of interest
902 * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
904 protected boolean isTimeout(Throwable thrown) {
905 if (thrown instanceof CompletionException) {
906 thrown = thrown.getCause();
909 return (thrown instanceof TimeoutException);
913 * Logs a response. If the response is not of type, String, then it attempts to
914 * pretty-print it into JSON before logging.
916 * @param direction IN or OUT
917 * @param infra communication infrastructure on which it was published
918 * @param source source name (e.g., the URL or Topic name)
919 * @param response response to be logged
920 * @return the JSON text that was logged
922 public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) {
925 if (response == null) {
927 } else if (response instanceof String) {
928 json = response.toString();
930 json = makeCoder().encode(response, true);
933 } catch (CoderException e) {
934 String type = (direction == EventType.IN ? "response" : "request");
935 logger.warn("cannot pretty-print {}", type, e);
936 json = response.toString();
939 logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
944 // these may be overridden by subclasses or junit tests
947 * Gets the retry count.
949 * @param retry retry, extracted from the parameters, or {@code null}
950 * @return the number of retries, or {@code 0} if no retries were specified
952 protected int getRetry(Integer retry) {
953 return (retry == null ? 0 : retry);
957 * Gets the retry wait, in milliseconds.
959 * @return the retry wait, in milliseconds
961 protected long getRetryWaitMs() {
962 return DEFAULT_RETRY_WAIT_MS;
966 * Gets the operation timeout.
968 * @param timeoutSec timeout, in seconds, extracted from the parameters, or
970 * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
973 protected long getTimeoutMs(Integer timeoutSec) {
974 return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
977 // these may be overridden by junit tests
979 protected Coder makeCoder() {