2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.impl;
23 import java.util.ArrayDeque;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.LinkedList;
27 import java.util.List;
28 import java.util.Queue;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.CompletionException;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34 import java.util.function.BiConsumer;
35 import java.util.function.Function;
36 import java.util.function.Supplier;
37 import java.util.function.UnaryOperator;
39 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
40 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
41 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
42 import org.onap.policy.common.utils.coder.Coder;
43 import org.onap.policy.common.utils.coder.CoderException;
44 import org.onap.policy.common.utils.coder.StandardCoder;
45 import org.onap.policy.controlloop.ControlLoopOperation;
46 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
47 import org.onap.policy.controlloop.actorserviceprovider.Operation;
48 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
49 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
50 import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
51 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
52 import org.onap.policy.controlloop.policy.PolicyResult;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 * Partial implementation of an operator. In general, it's preferable that subclasses
58 * would override {@link #startOperationAsync(int, OperationOutcome)
59 * startOperationAsync()}. However, if that proves to be too difficult, then they can
60 * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
61 * if the operation requires any preprocessor steps, the subclass may choose to override
62 * {@link #startPreprocessorAsync()}.
64 * The futures returned by the methods within this class can be canceled, and will
65 * propagate the cancellation to any subtasks. Thus it is also expected that any futures
66 * returned by overridden methods will do the same. Of course, if a class overrides
67 * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
68 * be done to cancel that particular operation.
70 public abstract class OperationPartial implements Operation {
71 private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
72 private static final Coder coder = new StandardCoder();
74 public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
76 private final OperatorConfig config;
79 * Operation parameters.
81 protected final ControlLoopOperationParams params;
84 private final String fullName;
88 * Constructs the object.
90 * @param params operation parameters
91 * @param config configuration for this operation
93 public OperationPartial(ControlLoopOperationParams params, OperatorConfig config) {
96 this.fullName = params.getActor() + "." + params.getOperation();
99 public Executor getBlockingExecutor() {
100 return config.getBlockingExecutor();
103 public String getActorName() {
104 return params.getActor();
107 public String getName() {
108 return params.getOperation();
112 public final CompletableFuture<OperationOutcome> start() {
113 // allocate a controller for the entire operation
114 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
116 CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
117 if (preproc == null) {
118 // no preprocessor required - just start the operation
119 return startOperationAttempt(controller, 1);
123 * Do preprocessor first and then, if successful, start the operation. Note:
124 * operations create their own outcome, ignoring the outcome from any previous
127 * Wrap the preprocessor to ensure "stop" is propagated to it.
130 controller.wrap(preproc)
131 .exceptionally(fromException("preprocessor of operation"))
132 .thenCompose(handlePreprocessorFailure(controller))
133 .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
134 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
141 * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
142 * invokes the call-backs, marks the controller complete, and returns an incomplete
143 * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
146 * Assumes that no callbacks have been invoked yet.
148 * @param controller pipeline controller
149 * @return a function that checks the outcome status and continues, if successful, or
150 * indicates a failure otherwise
152 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
153 PipelineControllerFuture<OperationOutcome> controller) {
157 if (outcome != null && isSuccess(outcome)) {
158 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
159 return CompletableFuture.completedFuture(outcome);
162 logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
164 final Executor executor = params.getExecutor();
165 final CallbackManager callbacks = new CallbackManager();
167 // propagate "stop" to the callbacks
168 controller.add(callbacks);
170 final OperationOutcome outcome2 = params.makeOutcome();
172 // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
174 outcome2.setResult(PolicyResult.FAILURE_GUARD);
175 outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
178 CompletableFuture.completedFuture(outcome2)
179 .whenCompleteAsync(callbackStarted(callbacks), executor)
180 .whenCompleteAsync(callbackCompleted(callbacks), executor)
181 .whenCompleteAsync(controller.delayedComplete(), executor);
184 return new CompletableFuture<>();
189 * Invokes the operation's preprocessor step(s) as a "future". This method simply
190 * invokes {@link #startGuardAsync()}.
192 * This method assumes the following:
194 * <li>the operator is alive</li>
195 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
198 * @return a function that will start the preprocessor and returns its outcome, or
199 * {@code null} if this operation needs no preprocessor
201 protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
202 return startGuardAsync();
206 * Invokes the operation's guard step(s) as a "future". This method simply returns
209 * This method assumes the following:
211 * <li>the operator is alive</li>
212 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
215 * @return a function that will start the guard checks and returns its outcome, or
216 * {@code null} if this operation has no guard
218 protected CompletableFuture<OperationOutcome> startGuardAsync() {
223 * Starts the operation attempt, with no preprocessor. When all retries complete, it
224 * will complete the controller.
226 * @param controller controller for all operation attempts
227 * @param attempt attempt number, typically starting with 1
228 * @return a future that will return the final result of all attempts
230 private CompletableFuture<OperationOutcome> startOperationAttempt(
231 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
233 // propagate "stop" to the operation attempt
234 controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
235 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
241 * Starts the operation attempt, without doing any retries.
243 * @param params operation parameters
244 * @param attempt attempt number, typically starting with 1
245 * @return a future that will return the result of a single operation attempt
247 private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
249 logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
251 final Executor executor = params.getExecutor();
252 final OperationOutcome outcome = params.makeOutcome();
253 final CallbackManager callbacks = new CallbackManager();
255 // this operation attempt gets its own controller
256 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
258 // propagate "stop" to the callbacks
259 controller.add(callbacks);
262 CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
263 .whenCompleteAsync(callbackStarted(callbacks), executor)
264 .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
267 // handle timeouts, if specified
268 long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
269 if (timeoutMillis > 0) {
270 logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
271 future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
275 * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
276 * before callbackCompleted() is invoked.
278 * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
279 * the pipeline as the last step anyway.
283 future.exceptionally(fromException("operation"))
284 .thenApply(setRetryFlag(attempt))
285 .whenCompleteAsync(callbackStarted(callbacks), executor)
286 .whenCompleteAsync(callbackCompleted(callbacks), executor)
287 .whenCompleteAsync(controller.delayedComplete(), executor);
294 * Determines if the outcome was successful.
296 * @param outcome outcome to examine
297 * @return {@code true} if the outcome was successful
299 protected boolean isSuccess(OperationOutcome outcome) {
300 return (outcome.getResult() == PolicyResult.SUCCESS);
304 * Determines if the outcome was a failure for this operator.
306 * @param outcome outcome to examine, or {@code null}
307 * @return {@code true} if the outcome is not {@code null} and was a failure
308 * <i>and</i> was associated with this operator, {@code false} otherwise
310 protected boolean isActorFailed(OperationOutcome outcome) {
311 return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
315 * Determines if the given outcome is for this operation.
317 * @param outcome outcome to examine
318 * @return {@code true} if the outcome is for this operation, {@code false} otherwise
320 protected boolean isSameOperation(OperationOutcome outcome) {
321 return OperationOutcome.isFor(outcome, getActorName(), getName());
325 * Invokes the operation as a "future". This method simply invokes
326 * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
327 * returning the result via a "future".
329 * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
330 * the executor in the "params", as that may bring the background thread pool to a
331 * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
334 * This method assumes the following:
336 * <li>the operator is alive</li>
337 * <li>verifyRunning() has been invoked</li>
338 * <li>callbackStarted() has been invoked</li>
339 * <li>the invoker will perform appropriate timeout checks</li>
340 * <li>exceptions generated within the pipeline will be handled by the invoker</li>
343 * @param attempt attempt number, typically starting with 1
344 * @return a function that will start the operation and return its result when
347 protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
349 return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
353 * Low-level method that performs the operation. This can make the same assumptions
354 * that are made by {@link #doOperationAsFuture()}. This particular method simply
355 * throws an {@link UnsupportedOperationException}.
357 * @param attempt attempt number, typically starting with 1
358 * @param operation the operation being performed
359 * @return the outcome of the operation
361 protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
363 throw new UnsupportedOperationException("start operation " + getFullName());
367 * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
368 * FAILURE, assuming the policy specifies retries and the retry count has been
371 * @param attempt latest attempt number, starting with 1
372 * @return a function to get the next future to execute
374 private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
376 return operation -> {
377 if (operation != null && !isActorFailed(operation)) {
379 * wrong type or wrong operation - just leave it as is. No need to log
380 * anything here, as retryOnFailure() will log a message
385 // get a non-null operation
386 OperationOutcome oper2;
387 if (operation != null) {
390 oper2 = params.makeOutcome();
391 oper2.setResult(PolicyResult.FAILURE);
394 int retry = getRetry(params.getRetry());
395 if (retry > 0 && attempt > retry) {
397 * retries were specified and we've already tried them all - change to
400 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
401 oper2.setResult(PolicyResult.FAILURE_RETRIES);
409 * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
410 * was previously invoked, and thus that the "operation" is not {@code null}.
412 * @param controller controller for all of the retries
413 * @param attempt latest attempt number, starting with 1
414 * @return a function to get the next future to execute
416 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
417 PipelineControllerFuture<OperationOutcome> controller, int attempt) {
419 return operation -> {
420 if (!isActorFailed(operation)) {
421 // wrong type or wrong operation - just leave it as is
422 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
423 controller.complete(operation);
424 return new CompletableFuture<>();
427 if (getRetry(params.getRetry()) <= 0) {
428 // no retries - already marked as FAILURE, so just return it
429 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
430 controller.complete(operation);
431 return new CompletableFuture<>();
435 * Retry the operation.
437 long waitMs = getRetryWaitMs();
438 logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
440 return sleep(waitMs, TimeUnit.MILLISECONDS)
441 .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
446 * Convenience method that starts a sleep(), running via a future.
448 * @param sleepTime time to sleep
449 * @param unit time unit
450 * @return a future that will complete when the sleep completes
452 protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
453 if (sleepTime <= 0) {
454 return CompletableFuture.completedFuture(null);
457 return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
461 * Converts an exception into an operation outcome, returning a copy of the outcome to
462 * prevent background jobs from changing it.
464 * @param type type of item throwing the exception
465 * @return a function that will convert an exception into an operation outcome
467 private Function<Throwable, OperationOutcome> fromException(String type) {
470 OperationOutcome outcome = params.makeOutcome();
472 logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
473 params.getRequestId(), thrown);
475 return setOutcome(outcome, thrown);
480 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
481 * any outstanding futures when one completes.
483 * @param futureMakers function to make a future. If the function returns
484 * {@code null}, then no future is created for that function. On the other
485 * hand, if the function throws an exception, then the previously created
486 * functions are canceled and the exception is re-thrown
487 * @return a future to cancel or await an outcome, or {@code null} if no futures were
488 * created. If this future is canceled, then all of the futures will be
491 protected CompletableFuture<OperationOutcome> anyOf(
492 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
494 return anyOf(Arrays.asList(futureMakers));
498 * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
499 * any outstanding futures when one completes.
501 * @param futureMakers function to make a future. If the function returns
502 * {@code null}, then no future is created for that function. On the other
503 * hand, if the function throws an exception, then the previously created
504 * functions are canceled and the exception is re-thrown
505 * @return a future to cancel or await an outcome, or {@code null} if no futures were
506 * created. If this future is canceled, then all of the futures will be
507 * canceled. Similarly, when this future completes, any incomplete futures
510 protected CompletableFuture<OperationOutcome> anyOf(
511 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
513 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
515 CompletableFuture<OperationOutcome>[] futures =
516 attachFutures(controller, futureMakers, UnaryOperator.identity());
518 if (futures.length == 0) {
519 // no futures were started
523 if (futures.length == 1) {
527 CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
528 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
534 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
536 * @param futureMakers function to make a future. If the function returns
537 * {@code null}, then no future is created for that function. On the other
538 * hand, if the function throws an exception, then the previously created
539 * functions are canceled and the exception is re-thrown
540 * @return a future to cancel or await an outcome, or {@code null} if no futures were
541 * created. If this future is canceled, then all of the futures will be
544 protected CompletableFuture<OperationOutcome> allOf(
545 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
547 return allOf(Arrays.asList(futureMakers));
551 * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
553 * @param futureMakers function to make a future. If the function returns
554 * {@code null}, then no future is created for that function. On the other
555 * hand, if the function throws an exception, then the previously created
556 * functions are canceled and the exception is re-thrown
557 * @return a future to cancel or await an outcome, or {@code null} if no futures were
558 * created. If this future is canceled, then all of the futures will be
559 * canceled. Similarly, when this future completes, any incomplete futures
562 protected CompletableFuture<OperationOutcome> allOf(
563 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
564 PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
566 Queue<OperationOutcome> outcomes = new LinkedList<>();
568 CompletableFuture<OperationOutcome>[] futures =
569 attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
570 synchronized (outcomes) {
571 outcomes.add(outcome);
576 if (futures.length == 0) {
577 // no futures were started
581 if (futures.length == 1) {
586 CompletableFuture.allOf(futures)
587 .thenApply(unused -> combineOutcomes(outcomes))
588 .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
595 * Invokes the functions to create the futures and attaches them to the controller.
597 * @param controller master controller for all of the futures
598 * @param futureMakers futures to be attached to the controller
599 * @param adorn function that "adorns" the future, possible adding onto its pipeline.
600 * Returns the adorned future
601 * @return an array of futures, possibly zero-length. If the array is of size one,
602 * then that one item should be returned instead of the controller
604 private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
605 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
606 UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
608 if (futureMakers.isEmpty()) {
609 @SuppressWarnings("unchecked")
610 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
614 // the last, unadorned future that is created
615 CompletableFuture<OperationOutcome> lastFuture = null;
617 List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
620 for (var maker : futureMakers) {
622 CompletableFuture<OperationOutcome> future = maker.get();
623 if (future == null) {
627 // propagate "stop" to the future
628 controller.add(future);
630 futures.add(adorn.apply(future));
634 } catch (RuntimeException e) {
635 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
636 controller.cancel(false);
641 @SuppressWarnings("unchecked")
642 CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
644 if (result.length == 1) {
645 // special case - return the unadorned future
646 result[0] = lastFuture;
650 return futures.toArray(result);
654 * Combines the outcomes from a set of tasks.
656 * @param outcomes outcomes to be examined
657 * @return the combined outcome
659 private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
661 // identify the outcome with the highest priority
662 OperationOutcome outcome = outcomes.remove();
663 int priority = detmPriority(outcome);
665 for (OperationOutcome outcome2 : outcomes) {
666 int priority2 = detmPriority(outcome2);
668 if (priority2 > priority) {
670 priority = priority2;
674 logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
675 (outcome == null ? null : outcome.getResult()), params.getRequestId());
681 * Determines the priority of an outcome based on its result.
683 * @param outcome outcome to examine, or {@code null}
684 * @return the outcome's priority
686 protected int detmPriority(OperationOutcome outcome) {
687 if (outcome == null || outcome.getResult() == null) {
691 switch (outcome.getResult()) {
698 case FAILURE_RETRIES:
704 case FAILURE_TIMEOUT:
707 case FAILURE_EXCEPTION:
714 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
715 * not created until the previous task completes. The pipeline returns the outcome of
716 * the last task executed.
718 * @param futureMakers functions to make the futures
719 * @return a future to cancel the sequence or await the outcome
721 protected CompletableFuture<OperationOutcome> sequence(
722 @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
724 return sequence(Arrays.asList(futureMakers));
728 * Performs a sequence of tasks, stopping if a task fails. A given task's future is
729 * not created until the previous task completes. The pipeline returns the outcome of
730 * the last task executed.
732 * @param futureMakers functions to make the futures
733 * @return a future to cancel the sequence or await the outcome, or {@code null} if
734 * there were no tasks to perform
736 protected CompletableFuture<OperationOutcome> sequence(
737 List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
739 Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
741 CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
742 if (nextTask == null) {
747 if (queue.isEmpty()) {
748 // only one task - just return it rather than wrapping it in a controller
753 * multiple tasks - need a controller to stop whichever task is currently
756 final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
757 final Executor executor = params.getExecutor();
760 controller.wrap(nextTask)
761 .thenComposeAsync(nextTaskOnSuccess(controller, queue), executor)
762 .whenCompleteAsync(controller.delayedComplete(), executor);
769 * Executes the next task in the queue, if the previous outcome was successful.
771 * @param controller pipeline controller
772 * @param taskQueue queue of tasks to be performed
773 * @return a future to execute the remaining tasks, or the current outcome, if it's a
774 * failure, or if there are no more tasks
776 private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
777 PipelineControllerFuture<OperationOutcome> controller,
778 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
781 if (!isSuccess(outcome)) {
782 // return the failure
783 return CompletableFuture.completedFuture(outcome);
786 CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
787 if (nextTask == null) {
788 // no tasks - just return the success
789 return CompletableFuture.completedFuture(outcome);
795 .thenComposeAsync(nextTaskOnSuccess(controller, taskQueue), params.getExecutor());
801 * Gets the next task from the queue, skipping those that are {@code null}.
803 * @param taskQueue task queue
804 * @return the next task, or {@code null} if the queue is now empty
806 private CompletableFuture<OperationOutcome> getNextTask(
807 Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
809 Supplier<CompletableFuture<OperationOutcome>> maker;
811 while ((maker = taskQueue.poll()) != null) {
812 CompletableFuture<OperationOutcome> future = maker.get();
813 if (future != null) {
822 * Sets the start time of the operation and invokes the callback to indicate that the
823 * operation has started. Does nothing if the pipeline has been stopped.
825 * This assumes that the "outcome" is not {@code null}.
827 * @param callbacks used to determine if the start callback can be invoked
828 * @return a function that sets the start time and invokes the callback
830 private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
832 return (outcome, thrown) -> {
834 if (callbacks.canStart()) {
835 // haven't invoked "start" callback yet
836 outcome.setStart(callbacks.getStartTime());
837 outcome.setEnd(null);
838 params.callbackStarted(outcome);
844 * Sets the end time of the operation and invokes the callback to indicate that the
845 * operation has completed. Does nothing if the pipeline has been stopped.
847 * This assumes that the "outcome" is not {@code null}.
849 * Note: the start time must be a reference rather than a plain value, because it's
850 * value must be gotten on-demand, when the returned function is executed at a later
853 * @param callbacks used to determine if the end callback can be invoked
854 * @return a function that sets the end time and invokes the callback
856 private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
858 return (outcome, thrown) -> {
860 if (callbacks.canEnd()) {
861 outcome.setStart(callbacks.getStartTime());
862 outcome.setEnd(callbacks.getEndTime());
863 params.callbackCompleted(outcome);
869 * Sets an operation's outcome and message, based on a throwable.
871 * @param operation operation to be updated
872 * @return the updated operation
874 protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
875 PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
876 return setOutcome(operation, result);
880 * Sets an operation's outcome and default message based on the result.
882 * @param operation operation to be updated
883 * @param result result of the operation
884 * @return the updated operation
886 public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
887 logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
888 operation.setResult(result);
889 operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
890 : ControlLoopOperation.FAILED_MSG);
896 * Determines if a throwable is due to a timeout.
898 * @param thrown throwable of interest
899 * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
901 protected boolean isTimeout(Throwable thrown) {
902 if (thrown instanceof CompletionException) {
903 thrown = thrown.getCause();
906 return (thrown instanceof TimeoutException);
910 * Logs a response. If the response is not of type, String, then it attempts to
911 * pretty-print it into JSON before logging.
913 * @param direction IN or OUT
914 * @param infra communication infrastructure on which it was published
915 * @param source source name (e.g., the URL or Topic name)
916 * @param response response to be logged
917 * @return the JSON text that was logged
919 public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) {
922 if (response == null) {
924 } else if (response instanceof String) {
925 json = response.toString();
927 json = makeCoder().encode(response, true);
930 } catch (CoderException e) {
931 String type = (direction == EventType.IN ? "response" : "request");
932 logger.warn("cannot pretty-print {}", type, e);
933 json = response.toString();
936 logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
941 // these may be overridden by subclasses or junit tests
944 * Gets the retry count.
946 * @param retry retry, extracted from the parameters, or {@code null}
947 * @return the number of retries, or {@code 0} if no retries were specified
949 protected int getRetry(Integer retry) {
950 return (retry == null ? 0 : retry);
954 * Gets the retry wait, in milliseconds.
956 * @return the retry wait, in milliseconds
958 protected long getRetryWaitMs() {
959 return DEFAULT_RETRY_WAIT_MS;
963 * Gets the operation timeout.
965 * @param timeoutSec timeout, in seconds, extracted from the parameters, or
967 * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
970 protected long getTimeoutMs(Integer timeoutSec) {
971 return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
974 // these may be overridden by junit tests
976 protected Coder makeCoder() {