2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.controlloop.actorserviceprovider.impl;
 
  23 import java.util.ArrayDeque;
 
  24 import java.util.ArrayList;
 
  25 import java.util.Arrays;
 
  26 import java.util.LinkedList;
 
  27 import java.util.List;
 
  28 import java.util.Queue;
 
  29 import java.util.concurrent.CompletableFuture;
 
  30 import java.util.concurrent.CompletionException;
 
  31 import java.util.concurrent.Executor;
 
  32 import java.util.concurrent.TimeUnit;
 
  33 import java.util.concurrent.TimeoutException;
 
  34 import java.util.function.BiConsumer;
 
  35 import java.util.function.Function;
 
  36 import java.util.function.Supplier;
 
  37 import java.util.function.UnaryOperator;
 
  38 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
 
  39 import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
 
  40 import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
 
  41 import org.onap.policy.common.utils.coder.Coder;
 
  42 import org.onap.policy.common.utils.coder.CoderException;
 
  43 import org.onap.policy.common.utils.coder.StandardCoder;
 
  44 import org.onap.policy.controlloop.ControlLoopOperation;
 
  45 import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
 
  46 import org.onap.policy.controlloop.actorserviceprovider.Operation;
 
  47 import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
 
  48 import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
 
  49 import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
 
  50 import org.onap.policy.controlloop.policy.PolicyResult;
 
  51 import org.slf4j.Logger;
 
  52 import org.slf4j.LoggerFactory;
 
  55  * Partial implementation of an operator. In general, it's preferable that subclasses
 
  56  * would override {@link #startOperationAsync(int, OperationOutcome)
 
  57  * startOperationAsync()}. However, if that proves to be too difficult, then they can
 
  58  * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
 
  59  * if the operation requires any preprocessor steps, the subclass may choose to override
 
  60  * {@link #startPreprocessorAsync()}.
 
  62  * The futures returned by the methods within this class can be canceled, and will
 
  63  * propagate the cancellation to any subtasks. Thus it is also expected that any futures
 
  64  * returned by overridden methods will do the same. Of course, if a class overrides
 
  65  * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
 
  66  * be done to cancel that particular operation.
 
  68 public abstract class OperationPartial implements Operation {
 
  69     private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
 
  70     private static final Coder coder = new StandardCoder();
 
  72     public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
 
  74     // values extracted from the operator
 
  76     private final OperatorPartial operator;
 
  79      * Operation parameters.
 
  81     protected final ControlLoopOperationParams params;
 
  85      * Constructs the object.
 
  87      * @param params operation parameters
 
  88      * @param operator operator that created this operation
 
  90     public OperationPartial(ControlLoopOperationParams params, OperatorPartial operator) {
 
  92         this.operator = operator;
 
  95     public Executor getBlockingExecutor() {
 
  96         return operator.getBlockingExecutor();
 
  99     public String getFullName() {
 
 100         return operator.getFullName();
 
 103     public String getActorName() {
 
 104         return operator.getActorName();
 
 107     public String getName() {
 
 108         return operator.getName();
 
 112     public final CompletableFuture<OperationOutcome> start() {
 
 113         if (!operator.isAlive()) {
 
 114             throw new IllegalStateException("operation is not running: " + getFullName());
 
 117         // allocate a controller for the entire operation
 
 118         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 120         CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
 
 121         if (preproc == null) {
 
 122             // no preprocessor required - just start the operation
 
 123             return startOperationAttempt(controller, 1);
 
 127          * Do preprocessor first and then, if successful, start the operation. Note:
 
 128          * operations create their own outcome, ignoring the outcome from any previous
 
 131          * Wrap the preprocessor to ensure "stop" is propagated to it.
 
 134         controller.wrap(preproc)
 
 135                         .exceptionally(fromException("preprocessor of operation"))
 
 136                         .thenCompose(handlePreprocessorFailure(controller))
 
 137                         .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
 
 138                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
 
 145      * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
 
 146      * invokes the call-backs, marks the controller complete, and returns an incomplete
 
 147      * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
 
 150      * Assumes that no callbacks have been invoked yet.
 
 152      * @param controller pipeline controller
 
 153      * @return a function that checks the outcome status and continues, if successful, or
 
 154      *         indicates a failure otherwise
 
 156     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
 
 157                     PipelineControllerFuture<OperationOutcome> controller) {
 
 161             if (outcome != null && isSuccess(outcome)) {
 
 162                 logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
 
 163                 return CompletableFuture.completedFuture(outcome);
 
 166             logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
 
 168             final Executor executor = params.getExecutor();
 
 169             final CallbackManager callbacks = new CallbackManager();
 
 171             // propagate "stop" to the callbacks
 
 172             controller.add(callbacks);
 
 174             final OperationOutcome outcome2 = params.makeOutcome();
 
 176             // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
 
 178             outcome2.setResult(PolicyResult.FAILURE_GUARD);
 
 179             outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
 
 182             CompletableFuture.completedFuture(outcome2)
 
 183                             .whenCompleteAsync(callbackStarted(callbacks), executor)
 
 184                             .whenCompleteAsync(callbackCompleted(callbacks), executor)
 
 185                             .whenCompleteAsync(controller.delayedComplete(), executor);
 
 188             return new CompletableFuture<>();
 
 193      * Invokes the operation's preprocessor step(s) as a "future". This method simply
 
 194      * invokes {@link #startGuardAsync()}.
 
 196      * This method assumes the following:
 
 198      * <li>the operator is alive</li>
 
 199      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
 
 202      * @return a function that will start the preprocessor and returns its outcome, or
 
 203      *         {@code null} if this operation needs no preprocessor
 
 205     protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
 
 206         return startGuardAsync();
 
 210      * Invokes the operation's guard step(s) as a "future". This method simply returns
 
 213      * This method assumes the following:
 
 215      * <li>the operator is alive</li>
 
 216      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
 
 219      * @return a function that will start the guard checks and returns its outcome, or
 
 220      *         {@code null} if this operation has no guard
 
 222     protected CompletableFuture<OperationOutcome> startGuardAsync() {
 
 227      * Starts the operation attempt, with no preprocessor. When all retries complete, it
 
 228      * will complete the controller.
 
 230      * @param controller controller for all operation attempts
 
 231      * @param attempt attempt number, typically starting with 1
 
 232      * @return a future that will return the final result of all attempts
 
 234     private CompletableFuture<OperationOutcome> startOperationAttempt(
 
 235                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
 
 237         // propagate "stop" to the operation attempt
 
 238         controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
 
 239                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
 
 245      * Starts the operation attempt, without doing any retries.
 
 247      * @param params operation parameters
 
 248      * @param attempt attempt number, typically starting with 1
 
 249      * @return a future that will return the result of a single operation attempt
 
 251     private CompletableFuture<OperationOutcome> startAttemptWithoutRetries(int attempt) {
 
 253         logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
 
 255         final Executor executor = params.getExecutor();
 
 256         final OperationOutcome outcome = params.makeOutcome();
 
 257         final CallbackManager callbacks = new CallbackManager();
 
 259         // this operation attempt gets its own controller
 
 260         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 262         // propagate "stop" to the callbacks
 
 263         controller.add(callbacks);
 
 266         CompletableFuture<OperationOutcome> future = CompletableFuture.completedFuture(outcome)
 
 267                         .whenCompleteAsync(callbackStarted(callbacks), executor)
 
 268                         .thenCompose(controller.wrap(outcome2 -> startOperationAsync(attempt, outcome2)));
 
 271         // handle timeouts, if specified
 
 272         long timeoutMillis = getTimeoutMs(params.getTimeoutSec());
 
 273         if (timeoutMillis > 0) {
 
 274             logger.info("{}: set timeout to {}ms for {}", getFullName(), timeoutMillis, params.getRequestId());
 
 275             future = future.orTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
 
 279          * Note: we re-invoke callbackStarted() just to be sure the callback is invoked
 
 280          * before callbackCompleted() is invoked.
 
 282          * Note: no need to remove "callbacks" from the pipeline, as we're going to stop
 
 283          * the pipeline as the last step anyway.
 
 287         future.exceptionally(fromException("operation"))
 
 288                     .thenApply(setRetryFlag(attempt))
 
 289                     .whenCompleteAsync(callbackStarted(callbacks), executor)
 
 290                     .whenCompleteAsync(callbackCompleted(callbacks), executor)
 
 291                     .whenCompleteAsync(controller.delayedComplete(), executor);
 
 298      * Determines if the outcome was successful.
 
 300      * @param outcome outcome to examine
 
 301      * @return {@code true} if the outcome was successful
 
 303     protected boolean isSuccess(OperationOutcome outcome) {
 
 304         return (outcome.getResult() == PolicyResult.SUCCESS);
 
 308      * Determines if the outcome was a failure for this operator.
 
 310      * @param outcome outcome to examine, or {@code null}
 
 311      * @return {@code true} if the outcome is not {@code null} and was a failure
 
 312      *         <i>and</i> was associated with this operator, {@code false} otherwise
 
 314     protected boolean isActorFailed(OperationOutcome outcome) {
 
 315         return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
 
 319      * Determines if the given outcome is for this operation.
 
 321      * @param outcome outcome to examine
 
 322      * @return {@code true} if the outcome is for this operation, {@code false} otherwise
 
 324     protected boolean isSameOperation(OperationOutcome outcome) {
 
 325         return OperationOutcome.isFor(outcome, getActorName(), getName());
 
 329      * Invokes the operation as a "future". This method simply invokes
 
 330      * {@link #doOperation()} using the {@link #blockingExecutor "blocking executor"},
 
 331      * returning the result via a "future".
 
 333      * Note: if the operation uses blocking I/O, then it should <i>not</i> be run using
 
 334      * the executor in the "params", as that may bring the background thread pool to a
 
 335      * grinding halt. The {@link #blockingExecutor "blocking executor"} should be used
 
 338      * This method assumes the following:
 
 340      * <li>the operator is alive</li>
 
 341      * <li>verifyRunning() has been invoked</li>
 
 342      * <li>callbackStarted() has been invoked</li>
 
 343      * <li>the invoker will perform appropriate timeout checks</li>
 
 344      * <li>exceptions generated within the pipeline will be handled by the invoker</li>
 
 347      * @param attempt attempt number, typically starting with 1
 
 348      * @return a function that will start the operation and return its result when
 
 351     protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
 
 353         return CompletableFuture.supplyAsync(() -> doOperation(attempt, outcome), getBlockingExecutor());
 
 357      * Low-level method that performs the operation. This can make the same assumptions
 
 358      * that are made by {@link #doOperationAsFuture()}. This particular method simply
 
 359      * throws an {@link UnsupportedOperationException}.
 
 361      * @param attempt attempt number, typically starting with 1
 
 362      * @param operation the operation being performed
 
 363      * @return the outcome of the operation
 
 365     protected OperationOutcome doOperation(int attempt, OperationOutcome operation) {
 
 367         throw new UnsupportedOperationException("start operation " + getFullName());
 
 371      * Sets the outcome status to FAILURE_RETRIES, if the current operation outcome is
 
 372      * FAILURE, assuming the policy specifies retries and the retry count has been
 
 375      * @param attempt latest attempt number, starting with 1
 
 376      * @return a function to get the next future to execute
 
 378     private Function<OperationOutcome, OperationOutcome> setRetryFlag(int attempt) {
 
 380         return operation -> {
 
 381             if (operation != null && !isActorFailed(operation)) {
 
 383                  * wrong type or wrong operation - just leave it as is. No need to log
 
 384                  * anything here, as retryOnFailure() will log a message
 
 389             // get a non-null operation
 
 390             OperationOutcome oper2;
 
 391             if (operation != null) {
 
 394                 oper2 = params.makeOutcome();
 
 395                 oper2.setResult(PolicyResult.FAILURE);
 
 398             int retry = getRetry(params.getRetry());
 
 399             if (retry > 0 && attempt > retry) {
 
 401                  * retries were specified and we've already tried them all - change to
 
 404                 logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
 
 405                 oper2.setResult(PolicyResult.FAILURE_RETRIES);
 
 413      * Restarts the operation if it was a FAILURE. Assumes that {@link #setRetryFlag(int)}
 
 414      * was previously invoked, and thus that the "operation" is not {@code null}.
 
 416      * @param controller controller for all of the retries
 
 417      * @param attempt latest attempt number, starting with 1
 
 418      * @return a function to get the next future to execute
 
 420     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> retryOnFailure(
 
 421                     PipelineControllerFuture<OperationOutcome> controller, int attempt) {
 
 423         return operation -> {
 
 424             if (!isActorFailed(operation)) {
 
 425                 // wrong type or wrong operation - just leave it as is
 
 426                 logger.info("not retrying operation {} for {}", getFullName(), params.getRequestId());
 
 427                 controller.complete(operation);
 
 428                 return new CompletableFuture<>();
 
 431             if (getRetry(params.getRetry()) <= 0) {
 
 432                 // no retries - already marked as FAILURE, so just return it
 
 433                 logger.info("operation {} no retries for {}", getFullName(), params.getRequestId());
 
 434                 controller.complete(operation);
 
 435                 return new CompletableFuture<>();
 
 439              * Retry the operation.
 
 441             long waitMs = getRetryWaitMs();
 
 442             logger.info("retry operation {} in {}ms for {}", getFullName(), waitMs, params.getRequestId());
 
 444             return sleep(waitMs, TimeUnit.MILLISECONDS)
 
 445                             .thenCompose(unused -> startOperationAttempt(controller, attempt + 1));
 
 450      * Convenience method that starts a sleep(), running via a future.
 
 452      * @param sleepTime time to sleep
 
 453      * @param unit time unit
 
 454      * @return a future that will complete when the sleep completes
 
 456     protected CompletableFuture<Void> sleep(long sleepTime, TimeUnit unit) {
 
 457         if (sleepTime <= 0) {
 
 458             return CompletableFuture.completedFuture(null);
 
 461         return new CompletableFuture<Void>().completeOnTimeout(null, sleepTime, unit);
 
 465      * Converts an exception into an operation outcome, returning a copy of the outcome to
 
 466      * prevent background jobs from changing it.
 
 468      * @param type type of item throwing the exception
 
 469      * @return a function that will convert an exception into an operation outcome
 
 471     private Function<Throwable, OperationOutcome> fromException(String type) {
 
 474             OperationOutcome outcome = params.makeOutcome();
 
 476             logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
 
 477                             params.getRequestId(), thrown);
 
 479             return setOutcome(outcome, thrown);
 
 484      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
 
 485      * any outstanding futures when one completes.
 
 487      * @param futureMakers function to make a future. If the function returns
 
 488      *        {@code null}, then no future is created for that function. On the other
 
 489      *        hand, if the function throws an exception, then the previously created
 
 490      *        functions are canceled and the exception is re-thrown
 
 491      * @return a future to cancel or await an outcome, or {@code null} if no futures were
 
 492      *         created. If this future is canceled, then all of the futures will be
 
 495     protected CompletableFuture<OperationOutcome> anyOf(
 
 496                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
 
 498         return anyOf(Arrays.asList(futureMakers));
 
 502      * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
 
 503      * any outstanding futures when one completes.
 
 505      * @param futureMakers function to make a future. If the function returns
 
 506      *        {@code null}, then no future is created for that function. On the other
 
 507      *        hand, if the function throws an exception, then the previously created
 
 508      *        functions are canceled and the exception is re-thrown
 
 509      * @return a future to cancel or await an outcome, or {@code null} if no futures were
 
 510      *         created. If this future is canceled, then all of the futures will be
 
 511      *         canceled. Similarly, when this future completes, any incomplete futures
 
 514     protected CompletableFuture<OperationOutcome> anyOf(
 
 515                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
 
 517         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 519         CompletableFuture<OperationOutcome>[] futures =
 
 520                         attachFutures(controller, futureMakers, UnaryOperator.identity());
 
 522         if (futures.length == 0) {
 
 523             // no futures were started
 
 527         if (futures.length == 1) {
 
 531         CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
 
 532                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
 
 538      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
 
 540      * @param futureMakers function to make a future. If the function returns
 
 541      *        {@code null}, then no future is created for that function. On the other
 
 542      *        hand, if the function throws an exception, then the previously created
 
 543      *        functions are canceled and the exception is re-thrown
 
 544      * @return a future to cancel or await an outcome, or {@code null} if no futures were
 
 545      *         created. If this future is canceled, then all of the futures will be
 
 548     protected CompletableFuture<OperationOutcome> allOf(
 
 549                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
 
 551         return allOf(Arrays.asList(futureMakers));
 
 555      * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
 
 557      * @param futureMakers function to make a future. If the function returns
 
 558      *        {@code null}, then no future is created for that function. On the other
 
 559      *        hand, if the function throws an exception, then the previously created
 
 560      *        functions are canceled and the exception is re-thrown
 
 561      * @return a future to cancel or await an outcome, or {@code null} if no futures were
 
 562      *         created. If this future is canceled, then all of the futures will be
 
 563      *         canceled. Similarly, when this future completes, any incomplete futures
 
 566     protected CompletableFuture<OperationOutcome> allOf(
 
 567                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
 
 568         PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 570         Queue<OperationOutcome> outcomes = new LinkedList<>();
 
 572         CompletableFuture<OperationOutcome>[] futures =
 
 573                         attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
 
 574                             synchronized (outcomes) {
 
 575                                 outcomes.add(outcome);
 
 580         if (futures.length == 0) {
 
 581             // no futures were started
 
 585         if (futures.length == 1) {
 
 590         CompletableFuture.allOf(futures)
 
 591                         .thenApply(unused -> combineOutcomes(outcomes))
 
 592                         .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
 
 599      * Invokes the functions to create the futures and attaches them to the controller.
 
 601      * @param controller master controller for all of the futures
 
 602      * @param futureMakers futures to be attached to the controller
 
 603      * @param adorn function that "adorns" the future, possible adding onto its pipeline.
 
 604      *        Returns the adorned future
 
 605      * @return an array of futures, possibly zero-length. If the array is of size one,
 
 606      *         then that one item should be returned instead of the controller
 
 608     private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
 
 609                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
 
 610                     UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
 
 612         if (futureMakers.isEmpty()) {
 
 613             @SuppressWarnings("unchecked")
 
 614             CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
 
 618         // the last, unadorned future that is created
 
 619         CompletableFuture<OperationOutcome> lastFuture = null;
 
 621         List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
 
 624         for (var maker : futureMakers) {
 
 626                 CompletableFuture<OperationOutcome> future = maker.get();
 
 627                 if (future == null) {
 
 631                 // propagate "stop" to the future
 
 632                 controller.add(future);
 
 634                 futures.add(adorn.apply(future));
 
 638             } catch (RuntimeException e) {
 
 639                 logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
 
 640                 controller.cancel(false);
 
 645         @SuppressWarnings("unchecked")
 
 646         CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
 
 648         if (result.length == 1) {
 
 649             // special case - return the unadorned future
 
 650             result[0] = lastFuture;
 
 654         return futures.toArray(result);
 
 658      * Combines the outcomes from a set of tasks.
 
 660      * @param outcomes outcomes to be examined
 
 661      * @return the combined outcome
 
 663     private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
 
 665         // identify the outcome with the highest priority
 
 666         OperationOutcome outcome = outcomes.remove();
 
 667         int priority = detmPriority(outcome);
 
 669         for (OperationOutcome outcome2 : outcomes) {
 
 670             int priority2 = detmPriority(outcome2);
 
 672             if (priority2 > priority) {
 
 674                 priority = priority2;
 
 678         logger.info("{}: combined outcome of tasks is {} for {}", getFullName(),
 
 679                         (outcome == null ? null : outcome.getResult()), params.getRequestId());
 
 685      * Determines the priority of an outcome based on its result.
 
 687      * @param outcome outcome to examine, or {@code null}
 
 688      * @return the outcome's priority
 
 690     protected int detmPriority(OperationOutcome outcome) {
 
 691         if (outcome == null || outcome.getResult() == null) {
 
 695         switch (outcome.getResult()) {
 
 702             case FAILURE_RETRIES:
 
 708             case FAILURE_TIMEOUT:
 
 711             case FAILURE_EXCEPTION:
 
 718      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
 
 719      * not created until the previous task completes. The pipeline returns the outcome of
 
 720      * the last task executed.
 
 722      * @param futureMakers functions to make the futures
 
 723      * @return a future to cancel the sequence or await the outcome
 
 725     protected CompletableFuture<OperationOutcome> sequence(
 
 726                     @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
 
 728         return sequence(Arrays.asList(futureMakers));
 
 732      * Performs a sequence of tasks, stopping if a task fails. A given task's future is
 
 733      * not created until the previous task completes. The pipeline returns the outcome of
 
 734      * the last task executed.
 
 736      * @param futureMakers functions to make the futures
 
 737      * @return a future to cancel the sequence or await the outcome, or {@code null} if
 
 738      *         there were no tasks to perform
 
 740     protected CompletableFuture<OperationOutcome> sequence(
 
 741                     List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
 
 743         Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
 
 745         CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
 
 746         if (nextTask == null) {
 
 751         if (queue.isEmpty()) {
 
 752             // only one task - just return it rather than wrapping it in a controller
 
 757          * multiple tasks - need a controller to stop whichever task is currently
 
 760         final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
 
 761         final Executor executor = params.getExecutor();
 
 764         controller.wrap(nextTask)
 
 765                     .thenComposeAsync(nextTaskOnSuccess(controller, queue), executor)
 
 766                     .whenCompleteAsync(controller.delayedComplete(), executor);
 
 773      * Executes the next task in the queue, if the previous outcome was successful.
 
 775      * @param controller pipeline controller
 
 776      * @param taskQueue queue of tasks to be performed
 
 777      * @return a future to execute the remaining tasks, or the current outcome, if it's a
 
 778      *         failure, or if there are no more tasks
 
 780     private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
 
 781                     PipelineControllerFuture<OperationOutcome> controller,
 
 782                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
 
 785             if (!isSuccess(outcome)) {
 
 786                 // return the failure
 
 787                 return CompletableFuture.completedFuture(outcome);
 
 790             CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
 
 791             if (nextTask == null) {
 
 792                 // no tasks - just return the success
 
 793                 return CompletableFuture.completedFuture(outcome);
 
 799                         .thenComposeAsync(nextTaskOnSuccess(controller, taskQueue), params.getExecutor());
 
 805      * Gets the next task from the queue, skipping those that are {@code null}.
 
 807      * @param taskQueue task queue
 
 808      * @return the next task, or {@code null} if the queue is now empty
 
 810     private CompletableFuture<OperationOutcome> getNextTask(
 
 811                     Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
 
 813         Supplier<CompletableFuture<OperationOutcome>> maker;
 
 815         while ((maker = taskQueue.poll()) != null) {
 
 816             CompletableFuture<OperationOutcome> future = maker.get();
 
 817             if (future != null) {
 
 826      * Sets the start time of the operation and invokes the callback to indicate that the
 
 827      * operation has started. Does nothing if the pipeline has been stopped.
 
 829      * This assumes that the "outcome" is not {@code null}.
 
 831      * @param callbacks used to determine if the start callback can be invoked
 
 832      * @return a function that sets the start time and invokes the callback
 
 834     private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
 
 836         return (outcome, thrown) -> {
 
 838             if (callbacks.canStart()) {
 
 839                 // haven't invoked "start" callback yet
 
 840                 outcome.setStart(callbacks.getStartTime());
 
 841                 outcome.setEnd(null);
 
 842                 params.callbackStarted(outcome);
 
 848      * Sets the end time of the operation and invokes the callback to indicate that the
 
 849      * operation has completed. Does nothing if the pipeline has been stopped.
 
 851      * This assumes that the "outcome" is not {@code null}.
 
 853      * Note: the start time must be a reference rather than a plain value, because it's
 
 854      * value must be gotten on-demand, when the returned function is executed at a later
 
 857      * @param callbacks used to determine if the end callback can be invoked
 
 858      * @return a function that sets the end time and invokes the callback
 
 860     private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
 
 862         return (outcome, thrown) -> {
 
 864             if (callbacks.canEnd()) {
 
 865                 outcome.setStart(callbacks.getStartTime());
 
 866                 outcome.setEnd(callbacks.getEndTime());
 
 867                 params.callbackCompleted(outcome);
 
 873      * Sets an operation's outcome and message, based on a throwable.
 
 875      * @param operation operation to be updated
 
 876      * @return the updated operation
 
 878     protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
 
 879         PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
 
 880         return setOutcome(operation, result);
 
 884      * Sets an operation's outcome and default message based on the result.
 
 886      * @param operation operation to be updated
 
 887      * @param result result of the operation
 
 888      * @return the updated operation
 
 890     public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
 
 891         logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
 
 892         operation.setResult(result);
 
 893         operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
 
 894                         : ControlLoopOperation.FAILED_MSG);
 
 900      * Determines if a throwable is due to a timeout.
 
 902      * @param thrown throwable of interest
 
 903      * @return {@code true} if the throwable is due to a timeout, {@code false} otherwise
 
 905     protected boolean isTimeout(Throwable thrown) {
 
 906         if (thrown instanceof CompletionException) {
 
 907             thrown = thrown.getCause();
 
 910         return (thrown instanceof TimeoutException);
 
 914      * Logs a response. If the response is not of type, String, then it attempts to
 
 915      * pretty-print it into JSON before logging.
 
 917      * @param direction IN or OUT
 
 918      * @param infra communication infrastructure on which it was published
 
 919      * @param source source name (e.g., the URL or Topic name)
 
 920      * @param response response to be logged
 
 921      * @return the JSON text that was logged
 
 923     public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) {
 
 926             if (response == null) {
 
 928             } else if (response instanceof String) {
 
 929                 json = response.toString();
 
 931                 json = makeCoder().encode(response, true);
 
 934         } catch (CoderException e) {
 
 935             String type = (direction == EventType.IN ? "response" : "request");
 
 936             logger.warn("cannot pretty-print {}", type, e);
 
 937             json = response.toString();
 
 940         logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
 
 945     // these may be overridden by subclasses or junit tests
 
 948      * Gets the retry count.
 
 950      * @param retry retry, extracted from the parameters, or {@code null}
 
 951      * @return the number of retries, or {@code 0} if no retries were specified
 
 953     protected int getRetry(Integer retry) {
 
 954         return (retry == null ? 0 : retry);
 
 958      * Gets the retry wait, in milliseconds.
 
 960      * @return the retry wait, in milliseconds
 
 962     protected long getRetryWaitMs() {
 
 963         return DEFAULT_RETRY_WAIT_MS;
 
 967      * Gets the operation timeout.
 
 969      * @param timeoutSec timeout, in seconds, extracted from the parameters, or
 
 971      * @return the operation timeout, in milliseconds, or {@code 0} if no timeout was
 
 974     protected long getTimeoutMs(Integer timeoutSec) {
 
 975         return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
 
 978     // these may be overridden by junit tests
 
 980     protected Coder makeCoder() {