2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
  23 import java.util.List;
 
  24 import java.util.concurrent.CompletableFuture;
 
  25 import java.util.concurrent.CompletionException;
 
  26 import java.util.concurrent.Executor;
 
  27 import java.util.concurrent.TimeUnit;
 
  28 import java.util.concurrent.TimeoutException;
 
  29 import java.util.function.BiConsumer;
 
  30 import java.util.function.Function;
 
  31 import org.onap.policy.controlloop.ControlLoopOperation;
 
  32 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
 
  33 import org.onap.policy.controlloop.actorserviceprovider.Operation;
 
  34 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 
  35 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 
  36 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
 
  37 import org.onap.policy.controlloop.policy.PolicyResult;
 
  38 import org.slf4j.Logger;
 
  39 import org.slf4j.LoggerFactory;
 
  42  * Partial implementation of an operator. In general, it's preferable that subclasses
 
  43  * would override {@link #startOperationAsync(int, OperationOutcome)
 
  44  * startOperationAsync()}. However, if that proves to be too difficult, then they can
 
  45  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
 
  46  * if the operation requires any preprocessor steps, the subclass may choose to override
 
  47  * {@link #startPreprocessorAsync()}.
 
  49  * The futures returned by the methods within this class can be canceled, and will
 
  50  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
 
  51  * returned by overridden methods will do the same. Of course, if a class overrides
 
  52  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
 
  53  * be done to cancel that particular operation.
 
  55 public abstract class OperationPartial implements Operation {
 
  57     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
 
  58     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
 
  60     // values extracted from the operator
 
  62     private final OperatorPartial operator;
 
  65      * Operation parameters.
 
  67     protected final ControlLoopOperationParams params;
 
  71      * Constructs the object.
 
  73      * @param params operation parameters
 
  74      * @param operator operator that created this operation
 
  76     public OperationPartial(ControlLoopOperationParams params, OperatorPartial operator) {
 
  78         this.operator = operator;
 
  81     public Executor getBlockingExecutor() {
 
  82         return operator.getBlockingExecutor();
 
  85     public String getFullName() {
 
  86         return operator.getFullName();
 
  89     public String getActorName() {
 
  90         return operator.getActorName();
 
  93     public String getName() {
 
  94         return operator.getName();
 
  98     public final CompletableFuture<OperationOutcome> start() {
 
  99         if (!operator.isAlive()) {
 
 100             throw new IllegalStateException("operation is not running: " + getFullName());
 
 103         // allocate a controller for the entire operation
 
 104         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 106         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
 
 107         if (preproc == null) {
 
 108             // no preprocessor required - just start the operation
 
 109             return startOperationAttempt(controller, 1);
 
 113          * Do preprocessor first and then, if successful, start the operation. Note:
 
 114          * operations create their own outcome, ignoring the outcome from any previous
 
 117          * Wrap the preprocessor to ensure "stop" is propagated to it.
 
 120         controller.wrap(preproc)
 
 121                         .exceptionally(fromException("preprocessor of operation"))
 
 122                         .thenCompose(handlePreprocessorFailure(controller))
 
 123                         .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
 
 124                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
 
 131      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
 
 132      * invokes the call-backs, marks the controller complete, and returns an incomplete
 
 133      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
 
 136      * Assumes that no callbacks have been invoked yet.
 
 138      * @param controller pipeline controller
 
 139      * @return a function that checks the outcome status and continues, if successful, or
 
 140      *         indicates a failure otherwise
 
 142     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
 
 143                     PipelineControllerFuture<OperationOutcome> controller) {
 
 147             if (outcome != null && isSuccess(outcome)) {
 
 148                 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
 
 149                 return CompletableFuture.completedFuture(outcome);
 
 152             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
 
 154             final Executor executor = params.getExecutor();
 
 155             final CallbackManager callbacks = new CallbackManager();
 
 157             // propagate "stop" to the callbacks
 
 158             controller.add(callbacks);
 
 160             final OperationOutcome outcome2 = params.makeOutcome();
 
 162             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
 
 164             outcome2.setResult(PolicyResult.FAILURE_GUARD);
 
 165             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
 
 168             CompletableFuture.completedFuture(outcome2)
 
 169                             .whenCompleteAsync(callbackStarted(callbacks), executor)
 
 170                             .whenCompleteAsync(callbackCompleted(callbacks), executor)
 
 171                             .whenCompleteAsync(controller.delayedComplete(), executor);
 
 174             return new CompletableFuture<>();
 
 179      * Invokes the operation's preprocessor step(s) as a "future". This method simply
 
 180      * invokes {@link #startGuardAsync()}.
 
 182      * This method assumes the following:
 
 184      * <li>the operator is alive</li>
 
 185      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
 
 188      * @return a function that will start the preprocessor and returns its outcome, or
 
 189      *         {@code null} if this operation needs no preprocessor
 
 191     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
 
 192         return startGuardAsync();
 
 196      * Invokes the operation's guard step(s) as a "future". This method simply returns
 
 199      * This method assumes the following:
 
 201      * <li>the operator is alive</li>
 
 202      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
 
 205      * @return a function that will start the guard checks and returns its outcome, or
 
 206      *         {@code null} if this operation has no guard
 
 208     protected CompletableFuture<OperationOutcome> startGuardAsync() {
 
 213      * Starts the operation attempt, with no preprocessor. When all retries complete, it
 
 214      * will complete the controller.
 
 216      * @param controller controller for all operation attempts
 
 217      * @param attempt attempt number, typically starting with 1
 
 218      * @return a future that will return the final result of all attempts
 
 220     private CompletableFuture<OperationOutcome> startOperationAttempt(
 
 221                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
 
 223         // propagate "stop" to the operation attempt
 
 224         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
 
 225                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
 
 231      * Starts the operation attempt, without doing any retries.
 
 233      * @param params operation parameters
 
 234      * @param attempt attempt number, typically starting with 1
 
 235      * @return a future that will return the result of a single operation attempt
 
 237     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
 
 239         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
 
 241         final Executor executor = params.getExecutor();
 
 242         final OperationOutcome outcome = params.makeOutcome();
 
 243         final CallbackManager callbacks = new CallbackManager();
 
 245         // this operation attempt gets its own controller
 
 246         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 248         // propagate "stop" to the callbacks
 
 249         controller.add(callbacks);
 
 252         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
 
 253                         .whenCompleteAsync(callbackStarted(callbacks), executor)
 
 254                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
 
 257         // handle timeouts, if specified
 
 258         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
 
 259         if (timeoutMillis > 0) {
 
 260             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
 
 261             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
 
 265          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
 
 266          * before callbackCompleted() is invoked.
 
 268          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
 
 269          * the pipeline as the last step anyway.
 
 273         future.exceptionally(fromException("operation"))
 
 274                     .thenApply(setRetryFlag(attempt))
 
 275                     .whenCompleteAsync(callbackStarted(callbacks), executor)
 
 276                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
 
 277                     .whenCompleteAsync(controller.delayedComplete(), executor);
 
 284      * Determines if the outcome was successful.
 
 286      * @param outcome outcome to examine
 
 287      * @return {@code true} if the outcome was successful
 
 289     protected boolean isSuccess(OperationOutcome outcome) {
 
 290         return (outcome.getResult() == PolicyResult.SUCCESS);
 
 294      * Determines if the outcome was a failure for this operator.
 
 296      * @param outcome outcome to examine, or {@code null}
 
 297      * @return {@code true} if the outcome is not {@code null} and was a failure
 
 298      *         <i>and</i> was associated with this operator, {@code false} otherwise
 
 300     protected boolean isActorFailed(OperationOutcome outcome) {
 
 301         return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
 
 305      * Determines if the given outcome is for this operation.
 
 307      * @param outcome outcome to examine
 
 308      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
 
 310     protected boolean isSameOperation(OperationOutcome outcome) {
 
 311         return OperationOutcome.isFor(outcome, getActorName(), getName());
 
 315      * Invokes the operation as a "future". This method simply invokes
 
 316      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
 
 317      * returning the result via a "future".
 
 319      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
 
 320      * the executor in the "params", as that may bring the background thread pool to a
 
 321      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
 
 324      * This method assumes the following:
 
 326      * <li>the operator is alive</li>
 
 327      * <li>verifyRunning() has been invoked</li>
 
 328      * <li>callbackStarted() has been invoked</li>
 
 329      * <li>the invoker will perform appropriate timeout checks</li>
 
 330      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
 
 333      * @param attempt attempt number, typically starting with 1
 
 334      * @return a function that will start the operation and return its result when
 
 337     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
 
 339         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
 
 343      * Low-level method that performs the operation. This can make the same assumptions
 
 344      * that are made by {@link #doOperationAsFuture()}. This particular method simply
 
 345      * throws an {@link UnsupportedOperationException}.
 
 347      * @param attempt attempt number, typically starting with 1
 
 348      * @param operation the operation being performed
 
 349      * @return the outcome of the operation
 
 351     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
 
 353         throw new UnsupportedOperationException("start operation " + getFullName());
 
 357      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
 
 358      * FAILURE, assuming the policy specifies retries and the retry count has been
 
 361      * @param attempt latest attempt number, starting with 1
 
 362      * @return a function to get the next future to execute
 
 364     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
 
 366         return operation -> {
 
 367             if (operation != null && !isActorFailed(operation)) {
 
 369                  * wrong type or wrong operation - just leave it as is. No need to log
 
 370                  * anything here, as retryOnFailure() will log a message
 
 375             // get a non-null operation
 
 376             OperationOutcome oper2;
 
 377             if (operation != null) {
 
 380                 oper2 = params.makeOutcome();
 
 381                 oper2.setResult(PolicyResult.FAILURE);
 
 384             int retry = getRetry(params.getRetry());
 
 385             if (retry > 0 && attempt > retry) {
 
 387                  * retries were specified and we've already tried them all - change to
 
 390                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
 
 391                 oper2.setResult(PolicyResult.FAILURE_RETRIES);
 
 399      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
 
 400      * was previously invoked, and thus that the "operation" is not {@code null}.
 
 402      * @param controller controller for all of the retries
 
 403      * @param attempt latest attempt number, starting with 1
 
 404      * @return a function to get the next future to execute
 
 406     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
 
 407                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
 
 409         return operation -> {
 
 410             if (!isActorFailed(operation)) {
 
 411                 // wrong type or wrong operation - just leave it as is
 
 412                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
 
 413                 controller.complete(operation);
 
 414                 return new CompletableFuture<>();
 
 417             if (getRetry(params.getRetry()) <= 0) {
 
 418                 // no retries - already marked as FAILURE, so just return it
 
 419                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
 
 420                 controller.complete(operation);
 
 421                 return new CompletableFuture<>();
 
 425              * Retry the operation.
 
 427             long waitMs = getRetryWaitMs();
 
 428             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
 
 430             return sleep(waitMs, TimeUnit.MILLISECONDS)
 
 431                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
 
 436      * Convenience method that starts a sleep(), running via a future.
 
 438      * @param sleepTime time to sleep
 
 439      * @param unit time unit
 
 440      * @return a future that will complete when the sleep completes
 
 442     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
 
 443         if (sleepTime <= 0) {
 
 444             return CompletableFuture.completedFuture(null);
 
 447         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
 
 451      * Converts an exception into an operation outcome, returning a copy of the outcome to
 
 452      * prevent background jobs from changing it.
 
 454      * @param type type of item throwing the exception
 
 455      * @return a function that will convert an exception into an operation outcome
 
 457     private Function<Throwable, OperationOutcome> fromException(String type) {
 
 460             OperationOutcome outcome = params.makeOutcome();
 
 462             logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
 
 463                             params.getRequestId(), thrown);
 
 465             return setOutcome(outcome, thrown);
 
 470      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
 
 471      * any outstanding futures when one completes.
 
 473      * @param futures futures for which to wait
 
 474      * @return a future to cancel or await an outcome. If this future is canceled, then
 
 475      *         all of the futures will be canceled
 
 477     protected CompletableFuture<OperationOutcome> anyOf(List<CompletableFuture<OperationOutcome>> futures) {
 
 479         // convert list to an array
 
 480         @SuppressWarnings("rawtypes")
 
 481         CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
 
 483         @SuppressWarnings("unchecked")
 
 484         CompletableFuture<OperationOutcome> result = anyOf(arrFutures);
 
 489      * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
 
 490      * outstanding futures when one completes.
 
 492      * @param futures futures for which to wait
 
 493      * @return a future to cancel or await an outcome. If this future is canceled, then
 
 494      *         all of the futures will be canceled
 
 496     protected CompletableFuture<OperationOutcome> anyOf(
 
 497                     @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
 
 499         if (futures.length == 1) {
 
 503         final Executor executor = params.getExecutor();
 
 504         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 506         attachFutures(controller, futures);
 
 509         CompletableFuture.anyOf(futures)
 
 510                             .thenApply(object -> (OperationOutcome) object)
 
 511                             .whenCompleteAsync(controller.delayedComplete(), executor);
 
 518      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
 
 519      * the futures if returned future is canceled. The future returns the "worst" outcome,
 
 520      * based on priority (see {@link #detmPriority(OperationOutcome)}).
 
 522      * @param futures futures for which to wait
 
 523      * @return a future to cancel or await an outcome. If this future is canceled, then
 
 524      *         all of the futures will be canceled
 
 526     protected CompletableFuture<OperationOutcome> allOf(List<CompletableFuture<OperationOutcome>> futures) {
 
 528         // convert list to an array
 
 529         @SuppressWarnings("rawtypes")
 
 530         CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
 
 532         @SuppressWarnings("unchecked")
 
 533         CompletableFuture<OperationOutcome> result = allOf(arrFutures);
 
 538      * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
 
 539      * futures if returned future is canceled. The future returns the "worst" outcome,
 
 540      * based on priority (see {@link #detmPriority(OperationOutcome)}).
 
 542      * @param futures futures for which to wait
 
 543      * @return a future to cancel or await an outcome. If this future is canceled, then
 
 544      *         all of the futures will be canceled
 
 546     protected CompletableFuture<OperationOutcome> allOf(
 
 547                     @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
 
 549         if (futures.length == 1) {
 
 553         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 555         attachFutures(controller, futures);
 
 557         OperationOutcome[] outcomes = new OperationOutcome[futures.length];
 
 559         @SuppressWarnings("rawtypes")
 
 560         CompletableFuture[] futures2 = new CompletableFuture[futures.length];
 
 562         // record the outcomes of each future when it completes
 
 563         for (int count = 0; count < futures2.length; ++count) {
 
 564             final int count2 = count;
 
 565             futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
 
 569         CompletableFuture.allOf(futures2)
 
 570                         .thenApply(unused -> combineOutcomes(outcomes))
 
 571                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
 
 578      * Attaches the given futures to the controller.
 
 580      * @param controller master controller for all of the futures
 
 581      * @param futures futures to be attached to the controller
 
 583     private void attachFutures(PipelineControllerFuture<OperationOutcome> controller,
 
 584                     @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
 
 586         if (futures.length == 0) {
 
 587             throw new IllegalArgumentException("empty list of futures");
 
 591         for (CompletableFuture<OperationOutcome> future : futures) {
 
 592             controller.add(future);
 
 597      * Combines the outcomes from a set of tasks.
 
 599      * @param outcomes outcomes to be examined
 
 600      * @return the combined outcome
 
 602     private OperationOutcome combineOutcomes(OperationOutcome[] outcomes) {
 
 604         // identify the outcome with the highest priority
 
 605         OperationOutcome outcome = outcomes[0];
 
 606         int priority = detmPriority(outcome);
 
 608         // start with "1", as we've already dealt with "0"
 
 609         for (int count = 1; count < outcomes.length; ++count) {
 
 610             OperationOutcome outcome2 = outcomes[count];
 
 611             int priority2 = detmPriority(outcome2);
 
 613             if (priority2 > priority) {
 
 615                 priority = priority2;
 
 619         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
 
 620                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
 
 626      * Determines the priority of an outcome based on its result.
 
 628      * @param outcome outcome to examine, or {@code null}
 
 629      * @return the outcome's priority
 
 631     protected int detmPriority(OperationOutcome outcome) {
 
 632         if (outcome == null || outcome.getResult() == null) {
 
 636         switch (outcome.getResult()) {
 
 643             case FAILURE_RETRIES:
 
 649             case FAILURE_TIMEOUT:
 
 652             case FAILURE_EXCEPTION:
 
 659      * Performs a task, after verifying that the controller is still running. Also checks
 
 660      * that the previous outcome was successful, if specified.
 
 662      * @param controller overall pipeline controller
 
 663      * @param checkSuccess {@code true} to check the previous outcome, {@code false}
 
 665      * @param outcome outcome of the previous task
 
 666      * @param task task to be performed
 
 667      * @return the task, if everything checks out. Otherwise, it returns an incomplete
 
 668      *         future and completes the controller instead
 
 671     protected CompletableFuture<OperationOutcome> doTask(
 
 672                     PipelineControllerFuture<OperationOutcome> controller,
 
 673                     boolean checkSuccess, OperationOutcome outcome,
 
 674                     CompletableFuture<OperationOutcome> task) {
 
 677         if (checkSuccess && !isSuccess(outcome)) {
 
 679              * must complete before canceling so that cancel() doesn't cause controller to
 
 682             controller.complete(outcome);
 
 684             return new CompletableFuture<>();
 
 687         return controller.wrap(task);
 
 691      * Performs a task, after verifying that the controller is still running. Also checks
 
 692      * that the previous outcome was successful, if specified.
 
 694      * @param controller overall pipeline controller
 
 695      * @param checkSuccess {@code true} to check the previous outcome, {@code false}
 
 697      * @param task function to start the task to be performed
 
 698      * @return a function to perform the task. If everything checks out, then it returns
 
 699      *         the task. Otherwise, it returns an incomplete future and completes the
 
 703     protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(
 
 704                     PipelineControllerFuture<OperationOutcome> controller,
 
 705                     boolean checkSuccess,
 
 706                     Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) {
 
 711             if (!controller.isRunning()) {
 
 712                 return new CompletableFuture<>();
 
 715             if (checkSuccess && !isSuccess(outcome)) {
 
 716                 controller.complete(outcome);
 
 717                 return new CompletableFuture<>();
 
 720             return controller.wrap(task.apply(outcome));
 
 725      * Sets the start time of the operation and invokes the callback to indicate that the
 
 726      * operation has started. Does nothing if the pipeline has been stopped.
 
 728      * This assumes that the "outcome" is not {@code null}.
 
 730      * @param callbacks used to determine if the start callback can be invoked
 
 731      * @return a function that sets the start time and invokes the callback
 
 733     private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
 
 735         return (outcome, thrown) -> {
 
 737             if (callbacks.canStart()) {
 
 738                 // haven't invoked "start" callback yet
 
 739                 outcome.setStart(callbacks.getStartTime());
 
 740                 outcome.setEnd(null);
 
 741                 params.callbackStarted(outcome);
 
 747      * Sets the end time of the operation and invokes the callback to indicate that the
 
 748      * operation has completed. Does nothing if the pipeline has been stopped.
 
 750      * This assumes that the "outcome" is not {@code null}.
 
 752      * Note: the start time must be a reference rather than a plain value, because it's
 
 753      * value must be gotten on-demand, when the returned function is executed at a later
 
 756      * @param callbacks used to determine if the end callback can be invoked
 
 757      * @return a function that sets the end time and invokes the callback
 
 759     private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
 
 761         return (outcome, thrown) -> {
 
 763             if (callbacks.canEnd()) {
 
 764                 outcome.setStart(callbacks.getStartTime());
 
 765                 outcome.setEnd(callbacks.getEndTime());
 
 766                 params.callbackCompleted(outcome);
 
 772      * Sets an operation's outcome and message, based on a throwable.
 
 774      * @param operation operation to be updated
 
 775      * @return the updated operation
 
 777     protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
 
 778         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
 
 779         return setOutcome(operation, result);
 
 783      * Sets an operation's outcome and default message based on the result.
 
 785      * @param operation operation to be updated
 
 786      * @param result result of the operation
 
 787      * @return the updated operation
 
 789     public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
 
 790         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
 
 791         operation.setResult(result);
 
 792         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
 
 793                         : ControlLoopOperation.FAILED_MSG);
 
 799      * Determines if a throwable is due to a timeout.
 
 801      * @param thrown throwable of interest
 
 802      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
 
 804     protected boolean isTimeout(Throwable thrown) {
 
 805         if (thrown instanceof CompletionException) {
 
 806             thrown = thrown.getCause();
 
 809         return (thrown instanceof TimeoutException);
 
 812     // these may be overridden by subclasses or junit tests
 
 815      * Gets the retry count.
 
 817      * @param retry retry, extracted from the parameters, or {@code null}
 
 818      * @return the number of retries, or {@code 0} if no retries were specified
 
 820     protected int getRetry(Integer retry) {
 
 821         return (retry == null ? 0 : retry);
 
 825      * Gets the retry wait, in milliseconds.
 
 827      * @return the retry wait, in milliseconds
 
 829     protected long getRetryWaitMs() {
 
 830         return DEFAULT_RETRY_WAIT_MS;
 
 834      * Gets the operation timeout.
 
 836      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
 
 838      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
 
 841     protected long getTimeoutMs(Integer timeoutSec) {
 
 842         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));