2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
  23 import java.util.List;
 
  25 import java.util.concurrent.CompletableFuture;
 
  26 import java.util.concurrent.CompletionException;
 
  27 import java.util.concurrent.Executor;
 
  28 import java.util.concurrent.TimeUnit;
 
  29 import java.util.concurrent.TimeoutException;
 
  30 import java.util.function.BiConsumer;
 
  31 import java.util.function.Function;
 
  32 import lombok.AccessLevel;
 
  35 import org.onap.policy.controlloop.ControlLoopOperation;
 
  36 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
 
  37 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 
  38 import org.onap.policy.controlloop.actorserviceprovider.Operator;
 
  39 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 
  40 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
 
  41 import org.onap.policy.controlloop.policy.PolicyResult;
 
  42 import org.slf4j.Logger;
 
  43 import org.slf4j.LoggerFactory;
 
  46  * Partial implementation of an operator. In general, it's preferable that subclasses
 
  48  * {@link #startOperationAsync(ControlLoopOperationParams, int, OperationOutcome)
 
  49  * startOperationAsync()}. However, if that proves to be too difficult, then they can
 
  50  * simply override {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome)
 
  51  * doOperation()}. In addition, if the operation requires any preprocessor steps, the
 
  52  * subclass may choose to override
 
  53  * {@link #startPreprocessorAsync(ControlLoopOperationParams) startPreprocessorAsync()}.
 
  55  * The futures returned by the methods within this class can be canceled, and will
 
  56  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
 
  57  * returned by overridden methods will do the same. Of course, if a class overrides
 
  58  * {@link #doOperation(ControlLoopOperationParams, int, OperationOutcome) doOperation()},
 
  59  * then there's little that can be done to cancel that particular operation.
 
  61 public abstract class OperatorPartial extends StartConfigPartial<Map<String, Object>> implements Operator {
 
  63     private static final Logger logger = LoggerFactory.getLogger(OperatorPartial.class);
 
  66      * Executor to be used for tasks that may perform blocking I/O. The default executor
 
  67      * simply launches a new thread for each command that is submitted to it.
 
  69      * May be overridden by junit tests.
 
  71     @Getter(AccessLevel.PROTECTED)
 
  72     @Setter(AccessLevel.PROTECTED)
 
  73     private Executor blockingExecutor = command -> {
 
  74         Thread thread = new Thread(command);
 
  75         thread.setDaemon(true);
 
  80     private final String actorName;
 
  83     private final String name;
 
  86      * Constructs the object.
 
  88      * @param actorName name of the actor with which this operator is associated
 
  89      * @param name operation name
 
  91     public OperatorPartial(String actorName, String name) {
 
  92         super(actorName + "." + name);
 
  93         this.actorName = actorName;
 
  98      * This method does nothing.
 
 101     protected void doConfigure(Map<String, Object> parameters) {
 
 106      * This method does nothing.
 
 109     protected void doStart() {
 
 114      * This method does nothing.
 
 117     protected void doStop() {
 
 122      * This method does nothing.
 
 125     protected void doShutdown() {
 
 130     public final CompletableFuture<OperationOutcome> startOperation(ControlLoopOperationParams params) {
 
 132             throw new IllegalStateException("operation is not running: " + getFullName());
 
 135         // allocate a controller for the entire operation
 
 136         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 138         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync(params);
 
 139         if (preproc == null) {
 
 140             // no preprocessor required - just start the operation
 
 141             return startOperationAttempt(params, controller, 1);
 
 145          * Do preprocessor first and then, if successful, start the operation. Note:
 
 146          * operations create their own outcome, ignoring the outcome from any previous
 
 149          * Wrap the preprocessor to ensure "stop" is propagated to it.
 
 152         controller.wrap(preproc)
 
 153                         .exceptionally(fromException(params, "preprocessor of operation"))
 
 154                         .thenCompose(handlePreprocessorFailure(params, controller))
 
 155                         .thenCompose(unusedOutcome -> startOperationAttempt(params, controller, 1));
 
 162      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
 
 163      * invokes the call-backs, marks the controller complete, and returns an incomplete
 
 164      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
 
 167      * Assumes that no callbacks have been invoked yet.
 
 169      * @param params operation parameters
 
 170      * @param controller pipeline controller
 
 171      * @return a function that checks the outcome status and continues, if successful, or
 
 172      *         indicates a failure otherwise
 
 174     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
 
 175                     ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller) {
 
 179             if (outcome != null && isSuccess(outcome)) {
 
 180                 logger.trace("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
 
 181                 return CompletableFuture.completedFuture(outcome);
 
 184             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
 
 186             final Executor executor = params.getExecutor();
 
 187             final CallbackManager callbacks = new CallbackManager();
 
 189             // propagate "stop" to the callbacks
 
 190             controller.add(callbacks);
 
 192             final OperationOutcome outcome2 = params.makeOutcome();
 
 194             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
 
 196             outcome2.setResult(PolicyResult.FAILURE_GUARD);
 
 197             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
 
 200             CompletableFuture.completedFuture(outcome2)
 
 201                             .whenCompleteAsync(callbackStarted(params, callbacks), executor)
 
 202                             .whenCompleteAsync(callbackCompleted(params, callbacks), executor)
 
 203                             .whenCompleteAsync(controller.delayedComplete(), executor);
 
 206             return new CompletableFuture<>();
 
 211      * Invokes the operation's preprocessor step(s) as a "future". This method simply
 
 212      * returns {@code null}.
 
 214      * This method assumes the following:
 
 216      * <li>the operator is alive</li>
 
 217      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
 
 220      * @param params operation parameters
 
 221      * @return a function that will start the preprocessor and returns its outcome, or
 
 222      *         {@code null} if this operation needs no preprocessor
 
 224     protected CompletableFuture<OperationOutcome> startPreprocessorAsync(ControlLoopOperationParams params) {
 
 229      * Starts the operation attempt, with no preprocessor. When all retries complete, it
 
 230      * will complete the controller.
 
 232      * @param params operation parameters
 
 233      * @param controller controller for all operation attempts
 
 234      * @param attempt attempt number, typically starting with 1
 
 235      * @return a future that will return the final result of all attempts
 
 237     private CompletableFuture<OperationOutcome> startOperationAttempt(ControlLoopOperationParams params,
 
 238                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
 
 240         // propagate "stop" to the operation attempt
 
 241         controller.wrap(startAttemptWithoutRetries(params, attempt))
 
 242                         .thenCompose(retryOnFailure(params, controller, attempt));
 
 248      * Starts the operation attempt, without doing any retries.
 
 250      * @param params operation parameters
 
 251      * @param attempt attempt number, typically starting with 1
 
 252      * @return a future that will return the result of a single operation attempt
 
 254     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(ControlLoopOperationParams params,
 
 257         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
 
 259         final Executor executor = params.getExecutor();
 
 260         final OperationOutcome outcome = params.makeOutcome();
 
 261         final CallbackManager callbacks = new CallbackManager();
 
 263         // this operation attempt gets its own controller
 
 264         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 266         // propagate "stop" to the callbacks
 
 267         controller.add(callbacks);
 
 270         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
 
 271                         .whenCompleteAsync(callbackStarted(params, callbacks), executor)
 
 272                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(params, attempt, outcome2)));
 
 275         // handle timeouts, if specified
 
 276         long timeoutMillis = getTimeOutMillis(params.getTimeoutSec());
 
 277         if (timeoutMillis > 0) {
 
 278             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
 
 279             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
 
 283          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
 
 284          * before callbackCompleted() is invoked.
 
 286          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
 
 287          * the pipeline as the last step anyway.
 
 291         future.exceptionally(fromException(params, "operation"))
 
 292                     .thenApply(setRetryFlag(params, attempt))
 
 293                     .whenCompleteAsync(callbackStarted(params, callbacks), executor)
 
 294                     .whenCompleteAsync(callbackCompleted(params, callbacks), executor)
 
 295                     .whenCompleteAsync(controller.delayedComplete(), executor);
 
 302      * Determines if the outcome was successful.
 
 304      * @param outcome outcome to examine
 
 305      * @return {@code true} if the outcome was successful
 
 307     protected boolean isSuccess(OperationOutcome outcome) {
 
 308         return (outcome.getResult() == PolicyResult.SUCCESS);
 
 312      * Determines if the outcome was a failure for this operator.
 
 314      * @param outcome outcome to examine, or {@code null}
 
 315      * @return {@code true} if the outcome is not {@code null} and was a failure
 
 316      *         <i>and</i> was associated with this operator, {@code false} otherwise
 
 318     protected boolean isActorFailed(OperationOutcome outcome) {
 
 319         return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
 
 323      * Determines if the given outcome is for this operation.
 
 325      * @param outcome outcome to examine
 
 326      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
 
 328     protected boolean isSameOperation(OperationOutcome outcome) {
 
 329         return OperationOutcome.isFor(outcome, getActorName(), getName());
 
 333      * Invokes the operation as a "future". This method simply invokes
 
 334      * {@link #doOperation(ControlLoopOperationParams)} using the {@link #blockingExecutor
 
 335      * "blocking executor"}, returning the result via a "future".
 
 337      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
 
 338      * the executor in the "params", as that may bring the background thread pool to a
 
 339      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
 
 342      * This method assumes the following:
 
 344      * <li>the operator is alive</li>
 
 345      * <li>verifyRunning() has been invoked</li>
 
 346      * <li>callbackStarted() has been invoked</li>
 
 347      * <li>the invoker will perform appropriate timeout checks</li>
 
 348      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
 
 351      * @param params operation parameters
 
 352      * @param attempt attempt number, typically starting with 1
 
 353      * @return a function that will start the operation and return its result when
 
 356     protected CompletableFuture<OperationOutcome> startOperationAsync(ControlLoopOperationParams params, int attempt,
 
 357                     OperationOutcome outcome) {
 
 359         return CompletableFuture.supplyAsync(() -> doOperation(params, attempt, outcome), getBlockingExecutor());
 
 363      * Low-level method that performs the operation. This can make the same assumptions
 
 364      * that are made by {@link #doOperationAsFuture(ControlLoopOperationParams)}. This
 
 365      * particular method simply throws an {@link UnsupportedOperationException}.
 
 367      * @param params operation parameters
 
 368      * @param attempt attempt number, typically starting with 1
 
 369      * @param operation the operation being performed
 
 370      * @return the outcome of the operation
 
 372     protected OperationOutcome doOperation(ControlLoopOperationParams params, int attempt, OperationOutcome operation) {
 
 374         throw new UnsupportedOperationException("start operation " + getFullName());
 
 378      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
 
 379      * FAILURE, assuming the policy specifies retries and the retry count has been
 
 382      * @param params operation parameters
 
 383      * @param attempt latest attempt number, starting with 1
 
 384      * @return a function to get the next future to execute
 
 386     private Function<OperationOutcome, OperationOutcome> setRetryFlag(ControlLoopOperationParams params, int attempt) {
 
 388         return operation -> {
 
 389             if (operation != null && !isActorFailed(operation)) {
 
 391                  * wrong type or wrong operation - just leave it as is. No need to log
 
 392                  * anything here, as retryOnFailure() will log a message
 
 397             // get a non-null operation
 
 398             OperationOutcome oper2;
 
 399             if (operation != null) {
 
 402                 oper2 = params.makeOutcome();
 
 403                 oper2.setResult(PolicyResult.FAILURE);
 
 406             Integer retry = params.getRetry();
 
 407             if (retry != null && retry > 0 && attempt > retry) {
 
 409                  * retries were specified and we've already tried them all - change to
 
 412                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
 
 413                 oper2.setResult(PolicyResult.FAILURE_RETRIES);
 
 421      * Restarts the operation if it was a FAILURE. Assumes that
 
 422      * {@link #setRetryFlag(ControlLoopOperationParams, int)} was previously invoked, and
 
 423      * thus that the "operation" is not {@code null}.
 
 425      * @param params operation parameters
 
 426      * @param controller controller for all of the retries
 
 427      * @param attempt latest attempt number, starting with 1
 
 428      * @return a function to get the next future to execute
 
 430     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
 
 431                     ControlLoopOperationParams params, PipelineControllerFuture<OperationOutcome> controller,
 
 434         return operation -> {
 
 435             if (!isActorFailed(operation)) {
 
 436                 // wrong type or wrong operation - just leave it as is
 
 437                 logger.trace("not retrying operation {} for {}", getFullName(), params.getRequestId());
 
 438                 controller.complete(operation);
 
 439                 return new CompletableFuture<>();
 
 442             Integer retry = params.getRetry();
 
 443             if (retry == null || retry <= 0) {
 
 444                 // no retries - already marked as FAILURE, so just return it
 
 445                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
 
 446                 controller.complete(operation);
 
 447                 return new CompletableFuture<>();
 
 452              * Retry the operation.
 
 454             logger.info("retry operation {} for {}", getFullName(), params.getRequestId());
 
 456             return startOperationAttempt(params, controller, attempt + 1);
 
 461      * Converts an exception into an operation outcome, returning a copy of the outcome to
 
 462      * prevent background jobs from changing it.
 
 464      * @param params operation parameters
 
 465      * @param type type of item throwing the exception
 
 466      * @return a function that will convert an exception into an operation outcome
 
 468     private Function<Throwable, OperationOutcome> fromException(ControlLoopOperationParams params, String type) {
 
 471             OperationOutcome outcome = params.makeOutcome();
 
 473             logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
 
 474                             params.getRequestId(), thrown);
 
 476             return setOutcome(params, outcome, thrown);
 
 481      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
 
 482      * any outstanding futures when one completes.
 
 484      * @param params operation parameters
 
 485      * @param futures futures for which to wait
 
 486      * @return a future to cancel or await an outcome. If this future is canceled, then
 
 487      *         all of the futures will be canceled
 
 489     protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params,
 
 490                     List<CompletableFuture<OperationOutcome>> futures) {
 
 492         // convert list to an array
 
 493         @SuppressWarnings("rawtypes")
 
 494         CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
 
 496         @SuppressWarnings("unchecked")
 
 497         CompletableFuture<OperationOutcome> result = anyOf(params, arrFutures);
 
 502      * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
 
 503      * outstanding futures when one completes.
 
 505      * @param params operation parameters
 
 506      * @param futures futures for which to wait
 
 507      * @return a future to cancel or await an outcome. If this future is canceled, then
 
 508      *         all of the futures will be canceled
 
 510     protected CompletableFuture<OperationOutcome> anyOf(ControlLoopOperationParams params,
 
 511                     @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
 
 513         final Executor executor = params.getExecutor();
 
 514         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 516         attachFutures(controller, futures);
 
 519         CompletableFuture.anyOf(futures)
 
 520                             .thenApply(object -> (OperationOutcome) object)
 
 521                             .whenCompleteAsync(controller.delayedComplete(), executor);
 
 528      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
 
 529      * the futures if returned future is canceled. The future returns the "worst" outcome,
 
 530      * based on priority (see {@link #detmPriority(OperationOutcome)}).
 
 532      * @param params operation parameters
 
 533      * @param futures futures for which to wait
 
 534      * @return a future to cancel or await an outcome. If this future is canceled, then
 
 535      *         all of the futures will be canceled
 
 537     protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params,
 
 538                     List<CompletableFuture<OperationOutcome>> futures) {
 
 540         // convert list to an array
 
 541         @SuppressWarnings("rawtypes")
 
 542         CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
 
 544         @SuppressWarnings("unchecked")
 
 545         CompletableFuture<OperationOutcome> result = allOf(params, arrFutures);
 
 550      * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
 
 551      * futures if returned future is canceled. The future returns the "worst" outcome,
 
 552      * based on priority (see {@link #detmPriority(OperationOutcome)}).
 
 554      * @param params operation parameters
 
 555      * @param futures futures for which to wait
 
 556      * @return a future to cancel or await an outcome. If this future is canceled, then
 
 557      *         all of the futures will be canceled
 
 559     protected CompletableFuture<OperationOutcome> allOf(ControlLoopOperationParams params,
 
 560                     @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
 
 562         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 564         attachFutures(controller, futures);
 
 566         OperationOutcome[] outcomes = new OperationOutcome[futures.length];
 
 568         @SuppressWarnings("rawtypes")
 
 569         CompletableFuture[] futures2 = new CompletableFuture[futures.length];
 
 571         // record the outcomes of each future when it completes
 
 572         for (int count = 0; count < futures2.length; ++count) {
 
 573             final int count2 = count;
 
 574             futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
 
 577         CompletableFuture.allOf(futures2).whenComplete(combineOutcomes(params, controller, outcomes));
 
 583      * Attaches the given futures to the controller.
 
 585      * @param controller master controller for all of the futures
 
 586      * @param futures futures to be attached to the controller
 
 588     private void attachFutures(PipelineControllerFuture<OperationOutcome> controller,
 
 589                     @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
 
 592         for (CompletableFuture<OperationOutcome> future : futures) {
 
 593             controller.add(future);
 
 598      * Combines the outcomes from a set of tasks.
 
 600      * @param params operation parameters
 
 601      * @param future future to be completed with the combined result
 
 602      * @param outcomes outcomes to be examined
 
 604     private BiConsumer<Void, Throwable> combineOutcomes(ControlLoopOperationParams params,
 
 605                     CompletableFuture<OperationOutcome> future, OperationOutcome[] outcomes) {
 
 607         return (unused, thrown) -> {
 
 608             if (thrown != null) {
 
 609                 future.completeExceptionally(thrown);
 
 613             // identify the outcome with the highest priority
 
 614             OperationOutcome outcome = outcomes[0];
 
 615             int priority = detmPriority(outcome);
 
 617             // start with "1", as we've already dealt with "0"
 
 618             for (int count = 1; count < outcomes.length; ++count) {
 
 619                 OperationOutcome outcome2 = outcomes[count];
 
 620                 int priority2 = detmPriority(outcome2);
 
 622                 if (priority2 > priority) {
 
 624                     priority = priority2;
 
 628             logger.trace("{}: combined outcome of tasks is {} for {}", getFullName(),
 
 629                             (outcome == null ? null : outcome.getResult()), params.getRequestId());
 
 631             future.complete(outcome);
 
 636      * Determines the priority of an outcome based on its result.
 
 638      * @param outcome outcome to examine, or {@code null}
 
 639      * @return the outcome's priority
 
 641     protected int detmPriority(OperationOutcome outcome) {
 
 642         if (outcome == null) {
 
 646         switch (outcome.getResult()) {
 
 653             case FAILURE_RETRIES:
 
 659             case FAILURE_TIMEOUT:
 
 662             case FAILURE_EXCEPTION:
 
 669      * Performs a task, after verifying that the controller is still running. Also checks
 
 670      * that the previous outcome was successful, if specified.
 
 672      * @param params operation parameters
 
 673      * @param controller overall pipeline controller
 
 674      * @param checkSuccess {@code true} to check the previous outcome, {@code false}
 
 676      * @param outcome outcome of the previous task
 
 677      * @param tasks tasks to be performed
 
 678      * @return a function to perform the task. If everything checks out, then it returns
 
 679      *         the task's future. Otherwise, it returns an incomplete future and completes
 
 680      *         the controller instead.
 
 683     protected CompletableFuture<OperationOutcome> doTask(ControlLoopOperationParams params,
 
 684                     PipelineControllerFuture<OperationOutcome> controller,
 
 685                     boolean checkSuccess, OperationOutcome outcome,
 
 686                     CompletableFuture<OperationOutcome> task) {
 
 689         if (checkSuccess && !isSuccess(outcome)) {
 
 691              * must complete before canceling so that cancel() doesn't cause controller to
 
 694             controller.complete(outcome);
 
 696             return new CompletableFuture<>();
 
 699         return controller.wrap(task);
 
 703      * Performs a task, after verifying that the controller is still running. Also checks
 
 704      * that the previous outcome was successful, if specified.
 
 706      * @param params operation parameters
 
 707      * @param controller overall pipeline controller
 
 708      * @param checkSuccess {@code true} to check the previous outcome, {@code false}
 
 710      * @param tasks tasks to be performed
 
 711      * @return a function to perform the task. If everything checks out, then it returns
 
 712      *         the task's future. Otherwise, it returns an incomplete future and completes
 
 713      *         the controller instead.
 
 716     protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(ControlLoopOperationParams params,
 
 717                     PipelineControllerFuture<OperationOutcome> controller,
 
 718                     boolean checkSuccess,
 
 719                     Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) {
 
 724             if (!controller.isRunning()) {
 
 725                 return new CompletableFuture<>();
 
 728             if (checkSuccess && !isSuccess(outcome)) {
 
 729                 controller.complete(outcome);
 
 730                 return new CompletableFuture<>();
 
 733             return controller.wrap(task.apply(outcome));
 
 738      * Sets the start time of the operation and invokes the callback to indicate that the
 
 739      * operation has started. Does nothing if the pipeline has been stopped.
 
 741      * This assumes that the "outcome" is not {@code null}.
 
 743      * @param params operation parameters
 
 744      * @param callbacks used to determine if the start callback can be invoked
 
 745      * @return a function that sets the start time and invokes the callback
 
 747     private BiConsumer<OperationOutcome, Throwable> callbackStarted(ControlLoopOperationParams params,
 
 748                     CallbackManager callbacks) {
 
 750         return (outcome, thrown) -> {
 
 752             if (callbacks.canStart()) {
 
 753                 // haven't invoked "start" callback yet
 
 754                 outcome.setStart(callbacks.getStartTime());
 
 755                 outcome.setEnd(null);
 
 756                 params.callbackStarted(outcome);
 
 762      * Sets the end time of the operation and invokes the callback to indicate that the
 
 763      * operation has completed. Does nothing if the pipeline has been stopped.
 
 765      * This assumes that the "outcome" is not {@code null}.
 
 767      * Note: the start time must be a reference rather than a plain value, because it's
 
 768      * value must be gotten on-demand, when the returned function is executed at a later
 
 771      * @param params operation parameters
 
 772      * @param callbacks used to determine if the end callback can be invoked
 
 773      * @return a function that sets the end time and invokes the callback
 
 775     private BiConsumer<OperationOutcome, Throwable> callbackCompleted(ControlLoopOperationParams params,
 
 776                     CallbackManager callbacks) {
 
 778         return (outcome, thrown) -> {
 
 780             if (callbacks.canEnd()) {
 
 781                 outcome.setStart(callbacks.getStartTime());
 
 782                 outcome.setEnd(callbacks.getEndTime());
 
 783                 params.callbackCompleted(outcome);
 
 789      * Sets an operation's outcome and message, based on a throwable.
 
 791      * @param params operation parameters
 
 792      * @param operation operation to be updated
 
 793      * @return the updated operation
 
 795     protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation,
 
 797         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
 
 798         return setOutcome(params, operation, result);
 
 802      * Sets an operation's outcome and default message based on the result.
 
 804      * @param params operation parameters
 
 805      * @param operation operation to be updated
 
 806      * @param result result of the operation
 
 807      * @return the updated operation
 
 809     protected OperationOutcome setOutcome(ControlLoopOperationParams params, OperationOutcome operation,
 
 810                     PolicyResult result) {
 
 811         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
 
 812         operation.setResult(result);
 
 813         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
 
 814                         : ControlLoopOperation.FAILED_MSG);
 
 820      * Determines if a throwable is due to a timeout.
 
 822      * @param thrown throwable of interest
 
 823      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
 
 825     protected boolean isTimeout(Throwable thrown) {
 
 826         if (thrown instanceof CompletionException) {
 
 827             thrown = thrown.getCause();
 
 830         return (thrown instanceof TimeoutException);
 
 833     // these may be overridden by junit tests
 
 836      * Gets the operation timeout. Subclasses may override this method to obtain the
 
 837      * timeout in some other way (e.g., through configuration properties).
 
 839      * @param timeoutSec timeout, in seconds, or {@code null}
 
 840      * @return the operation timeout, in milliseconds
 
 842     protected long getTimeOutMillis(Integer timeoutSec) {
 
 843         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));