X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=models-interactions%2Fmodel-actors%2FactorServiceProvider%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fpolicy%2Fcontrolloop%2Factorserviceprovider%2Fimpl%2FOperationPartial.java;h=c19ad6c319c9d5de425b1f2b710430ec14d0ab6f;hb=49f07db935d114b72a44e446867b16262dd552aa;hp=d00b88bb5ff2c6cbd3ae437608ea941bb6fd5488;hpb=ad1cd2013f45da5764fc9610db1f679d3c3762cb;p=policy%2Fmodels.git diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java index d00b88bb5..c19ad6c31 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2020-2022 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,16 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -28,13 +37,26 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; +import java.util.function.UnaryOperator; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil; +import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; +import org.onap.policy.common.utils.coder.Coder; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.common.utils.coder.StandardCoder; import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.actorserviceprovider.CallbackManager; import org.onap.policy.controlloop.actorserviceprovider.Operation; import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.OperationProperties; +import org.onap.policy.controlloop.actorserviceprovider.OperationResult; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig; import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; -import org.onap.policy.controlloop.policy.PolicyResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,176 +64,128 @@ import org.slf4j.LoggerFactory; * Partial implementation of an operator. In general, it's preferable that subclasses * would override {@link #startOperationAsync(int, OperationOutcome) * startOperationAsync()}. However, if that proves to be too difficult, then they can - * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition, - * if the operation requires any preprocessor steps, the subclass may choose to override - * {@link #startPreprocessorAsync()}. + * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. *

* The futures returned by the methods within this class can be canceled, and will * propagate the cancellation to any subtasks. Thus it is also expected that any futures * returned by overridden methods will do the same. Of course, if a class overrides * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can * be done to cancel that particular operation. + *

+ * In general tasks in a pipeline are executed by the same thread. However, the following + * should always be executed via the executor specified in "params": + *

*/ public abstract class OperationPartial implements Operation { - private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class); - public static final long DEFAULT_RETRY_WAIT_MS = 1000L; + private static final Coder coder = new StandardCoder(); - // values extracted from the operator + public static final String GUARD_ACTOR_NAME = "GUARD"; + public static final String GUARD_OPERATION_NAME = "Decision"; + public static final long DEFAULT_RETRY_WAIT_MS = 1000L; - private final OperatorPartial operator; + private final OperatorConfig config; /** * Operation parameters. */ protected final ControlLoopOperationParams params; + @Getter + private final String fullName; + + @Getter + @Setter(AccessLevel.PROTECTED) + private String subRequestId; + + @Getter + private final List propertyNames; + + /** + * Values for the properties identified by {@link #getPropertyNames()}. + */ + private final Map properties = new HashMap<>(); + /** * Constructs the object. * * @param params operation parameters - * @param operator operator that created this operation + * @param config configuration for this operation + * @param propertyNames names of properties required by this operation */ - public OperationPartial(ControlLoopOperationParams params, OperatorPartial operator) { + protected OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List propertyNames) { this.params = params; - this.operator = operator; + this.config = config; + this.fullName = params.getActor() + "." + params.getOperation(); + this.propertyNames = propertyNames; } public Executor getBlockingExecutor() { - return operator.getBlockingExecutor(); - } - - public String getFullName() { - return operator.getFullName(); + return config.getBlockingExecutor(); } + @Override public String getActorName() { - return operator.getActorName(); + return params.getActor(); } + @Override public String getName() { - return operator.getName(); + return params.getOperation(); } @Override - public final CompletableFuture start() { - if (!operator.isAlive()) { - throw new IllegalStateException("operation is not running: " + getFullName()); - } - - // allocate a controller for the entire operation - final PipelineControllerFuture controller = new PipelineControllerFuture<>(); - - CompletableFuture preproc = startPreprocessorAsync(); - if (preproc == null) { - // no preprocessor required - just start the operation - return startOperationAttempt(controller, 1); - } + public boolean containsProperty(String name) { + return properties.containsKey(name); + } - /* - * Do preprocessor first and then, if successful, start the operation. Note: - * operations create their own outcome, ignoring the outcome from any previous - * steps. - * - * Wrap the preprocessor to ensure "stop" is propagated to it. - */ - // @formatter:off - controller.wrap(preproc) - .exceptionally(fromException("preprocessor of operation")) - .thenCompose(handlePreprocessorFailure(controller)) - .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1)) - .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); - // @formatter:on + @Override + public void setProperty(String name, Object value) { + logger.info("{}: set property {}={}", getFullName(), name, value); + properties.put(name, value); + } - return controller; + @SuppressWarnings("unchecked") + @Override + public T getProperty(String name) { + return (T) properties.get(name); } /** - * Handles a failure in the preprocessor pipeline. If a failure occurred, then it - * invokes the call-backs, marks the controller complete, and returns an incomplete - * future, effectively halting the pipeline. Otherwise, it returns the outcome that it - * received. - *

- * Assumes that no callbacks have been invoked yet. + * Gets a property value, throwing an exception if it's missing. * - * @param controller pipeline controller - * @return a function that checks the outcome status and continues, if successful, or - * indicates a failure otherwise + * @param name property name + * @param propertyType property type, used in an error message if the property value + * is {@code null} + * @return the property value */ - private Function> handlePreprocessorFailure( - PipelineControllerFuture controller) { - - return outcome -> { - - if (outcome != null && isSuccess(outcome)) { - logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId()); - return CompletableFuture.completedFuture(outcome); - } - - logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId()); - - final Executor executor = params.getExecutor(); - final CallbackManager callbacks = new CallbackManager(); - - // propagate "stop" to the callbacks - controller.add(callbacks); - - final OperationOutcome outcome2 = params.makeOutcome(); - - // TODO need a FAILURE_MISSING_DATA (e.g., A&AI) - - outcome2.setResult(PolicyResult.FAILURE_GUARD); - outcome2.setMessage(outcome != null ? outcome.getMessage() : null); - - // @formatter:off - CompletableFuture.completedFuture(outcome2) - .whenCompleteAsync(callbackStarted(callbacks), executor) - .whenCompleteAsync(callbackCompleted(callbacks), executor) - .whenCompleteAsync(controller.delayedComplete(), executor); - // @formatter:on + @SuppressWarnings("unchecked") + public T getRequiredProperty(String name, String propertyType) { + T value = (T) properties.get(name); + if (value == null) { + throw new IllegalStateException("missing " + propertyType); + } - return new CompletableFuture<>(); - }; + return value; } - /** - * Invokes the operation's preprocessor step(s) as a "future". This method simply - * invokes {@link #startGuardAsync()}. - *

- * This method assumes the following: - *

    - *
  • the operator is alive
  • - *
  • exceptions generated within the pipeline will be handled by the invoker
  • - *
- * - * @return a function that will start the preprocessor and returns its outcome, or - * {@code null} if this operation needs no preprocessor - */ - protected CompletableFuture startPreprocessorAsync() { - return startGuardAsync(); - } + @Override + public CompletableFuture start() { + // allocate a controller for the entire operation + final PipelineControllerFuture controller = new PipelineControllerFuture<>(); - /** - * Invokes the operation's guard step(s) as a "future". This method simply returns - * {@code null}. - *

- * This method assumes the following: - *

    - *
  • the operator is alive
  • - *
  • exceptions generated within the pipeline will be handled by the invoker
  • - *
- * - * @return a function that will start the guard checks and returns its outcome, or - * {@code null} if this operation has no guard - */ - protected CompletableFuture startGuardAsync() { - return null; + // start attempt #1 + return startOperationAttempt(controller, 1); } /** - * Starts the operation attempt, with no preprocessor. When all retries complete, it - * will complete the controller. + * Starts the operation attempt. When all retries complete, it will complete the + * controller. * * @param controller controller for all operation attempts * @param attempt attempt number, typically starting with 1 @@ -220,6 +194,8 @@ public abstract class OperationPartial implements Operation { private CompletableFuture startOperationAttempt( PipelineControllerFuture controller, int attempt) { + generateSubRequestId(attempt); + // propagate "stop" to the operation attempt controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt)) .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); @@ -227,10 +203,20 @@ public abstract class OperationPartial implements Operation { return controller; } + /** + * Generates and sets {@link #subRequestId} to a new subrequest ID. + * + * @param attempt attempt number, typically starting with 1 + */ + public void generateSubRequestId(int attempt) { + // Note: this should be "protected", but that makes junits much messier + + setSubRequestId(UUID.randomUUID().toString()); + } + /** * Starts the operation attempt, without doing any retries. * - * @param params operation parameters * @param attempt attempt number, typically starting with 1 * @return a future that will return the result of a single operation attempt */ @@ -238,9 +224,9 @@ public abstract class OperationPartial implements Operation { logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId()); - final Executor executor = params.getExecutor(); - final OperationOutcome outcome = params.makeOutcome(); - final CallbackManager callbacks = new CallbackManager(); + final var executor = params.getExecutor(); + final var outcome = makeOutcome(); + final var callbacks = new CallbackManager(); // this operation attempt gets its own controller final PipelineControllerFuture controller = new PipelineControllerFuture<>(); @@ -287,7 +273,7 @@ public abstract class OperationPartial implements Operation { * @return {@code true} if the outcome was successful */ protected boolean isSuccess(OperationOutcome outcome) { - return (outcome.getResult() == PolicyResult.SUCCESS); + return (outcome != null && outcome.getResult() == OperationResult.SUCCESS); } /** @@ -298,7 +284,7 @@ public abstract class OperationPartial implements Operation { * and was associated with this operator, {@code false} otherwise */ protected boolean isActorFailed(OperationOutcome outcome) { - return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE); + return (isSameOperation(outcome) && outcome.getResult() == OperationResult.FAILURE); } /** @@ -363,35 +349,47 @@ public abstract class OperationPartial implements Operation { */ private Function setRetryFlag(int attempt) { - return operation -> { - if (operation != null && !isActorFailed(operation)) { - /* - * wrong type or wrong operation - just leave it as is. No need to log - * anything here, as retryOnFailure() will log a message - */ - return operation; + return origOutcome -> { + // ensure we have a non-null outcome + OperationOutcome outcome; + if (origOutcome != null) { + outcome = origOutcome; + } else { + logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId()); + outcome = this.setOutcome(makeOutcome(), OperationResult.FAILURE); } - // get a non-null operation - OperationOutcome oper2; - if (operation != null) { - oper2 = operation; - } else { - oper2 = params.makeOutcome(); - oper2.setResult(PolicyResult.FAILURE); + // ensure correct actor/operation + outcome.setActor(getActorName()); + outcome.setOperation(getName()); + + // determine if we should retry, based on the result + if (outcome.getResult() != OperationResult.FAILURE) { + // do not retry success or other failure types (e.g., exception) + outcome.setFinalOutcome(true); + return outcome; } int retry = getRetry(params.getRetry()); - if (retry > 0 && attempt > retry) { + if (retry <= 0) { + // no retries were specified + outcome.setFinalOutcome(true); + + } else if (attempt <= retry) { + // have more retries - not the final outcome + outcome.setFinalOutcome(false); + + } else { /* - * retries were specified and we've already tried them all - change to + * retries were specified, and we've already tried them all - change to * FAILURE_RETRIES */ logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId()); - oper2.setResult(PolicyResult.FAILURE_RETRIES); + outcome.setResult(OperationResult.FAILURE_RETRIES); + outcome.setFinalOutcome(true); } - return oper2; + return outcome; }; } @@ -457,10 +455,16 @@ public abstract class OperationPartial implements Operation { private Function fromException(String type) { return thrown -> { - OperationOutcome outcome = params.makeOutcome(); + OperationOutcome outcome = makeOutcome(); - logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(), - params.getRequestId(), thrown); + if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) { + // do not include exception in the message, as it just clutters the log + logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(), + params.getRequestId()); + } else { + logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(), + params.getRequestId(), thrown); + } return setOutcome(outcome, thrown); }; @@ -470,103 +474,108 @@ public abstract class OperationPartial implements Operation { * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels * any outstanding futures when one completes. * - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled + * @param futureMakers function to make a future. If the function returns + * {@code null}, then no future is created for that function. On the other + * hand, if the function throws an exception, then the previously created + * functions are canceled and the exception is re-thrown + * @return a future to cancel or await an outcome, or {@code null} if no futures were + * created. If this future is canceled, then all of the futures will be + * canceled */ - protected CompletableFuture anyOf(List> futures) { - - // convert list to an array - @SuppressWarnings("rawtypes") - CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + public CompletableFuture anyOf( + @SuppressWarnings("unchecked") Supplier>... futureMakers) { - @SuppressWarnings("unchecked") - CompletableFuture result = anyOf(arrFutures); - return result; + return anyOf(Arrays.asList(futureMakers)); } /** - * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any - * outstanding futures when one completes. + * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels + * any outstanding futures when one completes. * - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled + * @param futureMakers function to make a future. If the function returns + * {@code null}, then no future is created for that function. On the other + * hand, if the function throws an exception, then the previously created + * functions are canceled and the exception is re-thrown + * @return a future to cancel or await an outcome, or {@code null} if no futures were + * created. If this future is canceled, then all of the futures will be + * canceled. Similarly, when this future completes, any incomplete futures + * will be canceled */ - protected CompletableFuture anyOf( - @SuppressWarnings("unchecked") CompletableFuture... futures) { + public CompletableFuture anyOf(List>> futureMakers) { + + PipelineControllerFuture controller = new PipelineControllerFuture<>(); + + CompletableFuture[] futures = + attachFutures(controller, futureMakers, UnaryOperator.identity()); + + if (futures.length == 0) { + // no futures were started + return null; + } if (futures.length == 1) { return futures[0]; } - final Executor executor = params.getExecutor(); - final PipelineControllerFuture controller = new PipelineControllerFuture<>(); - - attachFutures(controller, futures); - - // @formatter:off - CompletableFuture.anyOf(futures) - .thenApply(object -> (OperationOutcome) object) - .whenCompleteAsync(controller.delayedComplete(), executor); - // @formatter:on + CompletableFuture.anyOf(futures).thenApply(OperationOutcome.class::cast) + .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); return controller; } /** - * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels - * the futures if returned future is canceled. The future returns the "worst" outcome, - * based on priority (see {@link #detmPriority(OperationOutcome)}). + * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}. * - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled + * @param futureMakers function to make a future. If the function returns + * {@code null}, then no future is created for that function. On the other + * hand, if the function throws an exception, then the previously created + * functions are canceled and the exception is re-thrown + * @return a future to cancel or await an outcome, or {@code null} if no futures were + * created. If this future is canceled, then all of the futures will be + * canceled */ - protected CompletableFuture allOf(List> futures) { - - // convert list to an array - @SuppressWarnings("rawtypes") - CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]); + public CompletableFuture allOf( + @SuppressWarnings("unchecked") Supplier>... futureMakers) { - @SuppressWarnings("unchecked") - CompletableFuture result = allOf(arrFutures); - return result; + return allOf(Arrays.asList(futureMakers)); } /** - * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the - * futures if returned future is canceled. The future returns the "worst" outcome, - * based on priority (see {@link #detmPriority(OperationOutcome)}). + * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}. * - * @param futures futures for which to wait - * @return a future to cancel or await an outcome. If this future is canceled, then - * all of the futures will be canceled + * @param futureMakers function to make a future. If the function returns + * {@code null}, then no future is created for that function. On the other + * hand, if the function throws an exception, then the previously created + * functions are canceled and the exception is re-thrown + * @return a future to cancel or await an outcome, or {@code null} if no futures were + * created. If this future is canceled, then all of the futures will be + * canceled. Similarly, when this future completes, any incomplete futures + * will be canceled */ - protected CompletableFuture allOf( - @SuppressWarnings("unchecked") CompletableFuture... futures) { - - if (futures.length == 1) { - return futures[0]; - } - - final PipelineControllerFuture controller = new PipelineControllerFuture<>(); + public CompletableFuture allOf(List>> futureMakers) { + PipelineControllerFuture controller = new PipelineControllerFuture<>(); - attachFutures(controller, futures); + Queue outcomes = new LinkedList<>(); - OperationOutcome[] outcomes = new OperationOutcome[futures.length]; + CompletableFuture[] futures = + attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> { + synchronized (outcomes) { + outcomes.add(outcome); + } + return outcome; + })); - @SuppressWarnings("rawtypes") - CompletableFuture[] futures2 = new CompletableFuture[futures.length]; + if (futures.length == 0) { + // no futures were started + return null; + } - // record the outcomes of each future when it completes - for (int count = 0; count < futures2.length; ++count) { - final int count2 = count; - futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2); + if (futures.length == 1) { + return futures[0]; } // @formatter:off - CompletableFuture.allOf(futures2) + CompletableFuture.allOf(futures) .thenApply(unused -> combineOutcomes(outcomes)) .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); // @formatter:on @@ -575,22 +584,60 @@ public abstract class OperationPartial implements Operation { } /** - * Attaches the given futures to the controller. + * Invokes the functions to create the futures and attaches them to the controller. * * @param controller master controller for all of the futures - * @param futures futures to be attached to the controller - */ - private void attachFutures(PipelineControllerFuture controller, - @SuppressWarnings("unchecked") CompletableFuture... futures) { + * @param futureMakers futures to be attached to the controller + * @param adorn function that "adorns" the future, possible adding onto its pipeline. + * Returns the adorned future + * @return an array of futures, possibly zero-length. If the array is of size one, + * then that one item should be returned instead of the controller + */ + @SuppressWarnings("unchecked") + private CompletableFuture[] attachFutures(PipelineControllerFuture controller, + List>> futureMakers, + UnaryOperator> adorn) { + + if (futureMakers.isEmpty()) { + return new CompletableFuture[0]; + } - if (futures.length == 0) { - throw new IllegalArgumentException("empty list of futures"); + // the last, unadorned future that is created + CompletableFuture lastFuture = null; + + List> futures = new ArrayList<>(futureMakers.size()); + + // make each future + for (var maker : futureMakers) { + try { + CompletableFuture future = maker.get(); + if (future == null) { + continue; + } + + // propagate "stop" to the future + controller.add(future); + + futures.add(adorn.apply(future)); + + lastFuture = future; + + } catch (RuntimeException e) { + logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId()); + controller.cancel(false); + throw e; + } } - // attach each task - for (CompletableFuture future : futures) { - controller.add(future); + var result = new CompletableFuture[futures.size()]; + + if (result.length == 1) { + // special case - return the unadorned future + result[0] = lastFuture; + return result; } + + return futures.toArray(result); } /** @@ -599,15 +646,13 @@ public abstract class OperationPartial implements Operation { * @param outcomes outcomes to be examined * @return the combined outcome */ - private OperationOutcome combineOutcomes(OperationOutcome[] outcomes) { + private OperationOutcome combineOutcomes(Queue outcomes) { // identify the outcome with the highest priority - OperationOutcome outcome = outcomes[0]; + OperationOutcome outcome = outcomes.remove(); int priority = detmPriority(outcome); - // start with "1", as we've already dealt with "0" - for (int count = 1; count < outcomes.length; ++count) { - OperationOutcome outcome2 = outcomes[count]; + for (OperationOutcome outcome2 : outcomes) { int priority2 = detmPriority(outcome2); if (priority2 > priority) { @@ -656,71 +701,113 @@ public abstract class OperationPartial implements Operation { } /** - * Performs a task, after verifying that the controller is still running. Also checks - * that the previous outcome was successful, if specified. + * Performs a sequence of tasks, stopping if a task fails. A given task's future is + * not created until the previous task completes. The pipeline returns the outcome of + * the last task executed. * - * @param controller overall pipeline controller - * @param checkSuccess {@code true} to check the previous outcome, {@code false} - * otherwise - * @param outcome outcome of the previous task - * @param task task to be performed - * @return the task, if everything checks out. Otherwise, it returns an incomplete - * future and completes the controller instead + * @param futureMakers functions to make the futures + * @return a future to cancel the sequence or await the outcome */ - // @formatter:off - protected CompletableFuture doTask( - PipelineControllerFuture controller, - boolean checkSuccess, OperationOutcome outcome, - CompletableFuture task) { - // @formatter:on + public CompletableFuture sequence( + @SuppressWarnings("unchecked") Supplier>... futureMakers) { - if (checkSuccess && !isSuccess(outcome)) { - /* - * must complete before canceling so that cancel() doesn't cause controller to - * complete - */ - controller.complete(outcome); - task.cancel(false); - return new CompletableFuture<>(); + return sequence(Arrays.asList(futureMakers)); + } + + /** + * Performs a sequence of tasks, stopping if a task fails. A given task's future is + * not created until the previous task completes. The pipeline returns the outcome of + * the last task executed. + * + * @param futureMakers functions to make the futures + * @return a future to cancel the sequence or await the outcome, or {@code null} if + * there were no tasks to perform + */ + public CompletableFuture sequence( + List>> futureMakers) { + + Queue>> queue = new ArrayDeque<>(futureMakers); + + CompletableFuture nextTask = getNextTask(queue); + if (nextTask == null) { + // no tasks + return null; + } + + if (queue.isEmpty()) { + // only one task - just return it rather than wrapping it in a controller + return nextTask; } - return controller.wrap(task); + /* + * multiple tasks - need a controller to stop whichever task is currently + * executing + */ + final PipelineControllerFuture controller = new PipelineControllerFuture<>(); + final var executor = params.getExecutor(); + + // @formatter:off + controller.wrap(nextTask) + .thenCompose(nextTaskOnSuccess(controller, queue)) + .whenCompleteAsync(controller.delayedComplete(), executor); + // @formatter:on + + return controller; } /** - * Performs a task, after verifying that the controller is still running. Also checks - * that the previous outcome was successful, if specified. + * Executes the next task in the queue, if the previous outcome was successful. * - * @param controller overall pipeline controller - * @param checkSuccess {@code true} to check the previous outcome, {@code false} - * otherwise - * @param task function to start the task to be performed - * @return a function to perform the task. If everything checks out, then it returns - * the task. Otherwise, it returns an incomplete future and completes the - * controller instead + * @param controller pipeline controller + * @param taskQueue queue of tasks to be performed + * @return a future to execute the remaining tasks, or the current outcome, if it's a + * failure, or if there are no more tasks */ - // @formatter:off - protected Function> doTask( + private Function> nextTaskOnSuccess( PipelineControllerFuture controller, - boolean checkSuccess, - Function> task) { - // @formatter:on + Queue>> taskQueue) { return outcome -> { - - if (!controller.isRunning()) { - return new CompletableFuture<>(); + if (!isSuccess(outcome)) { + // return the failure + return CompletableFuture.completedFuture(outcome); } - if (checkSuccess && !isSuccess(outcome)) { - controller.complete(outcome); - return new CompletableFuture<>(); + CompletableFuture nextTask = getNextTask(taskQueue); + if (nextTask == null) { + // no tasks - just return the success + return CompletableFuture.completedFuture(outcome); } - return controller.wrap(task.apply(outcome)); + // @formatter:off + return controller + .wrap(nextTask) + .thenCompose(nextTaskOnSuccess(controller, taskQueue)); + // @formatter:on }; } + /** + * Gets the next task from the queue, skipping those that are {@code null}. + * + * @param taskQueue task queue + * @return the next task, or {@code null} if the queue is now empty + */ + private CompletableFuture getNextTask( + Queue>> taskQueue) { + + Supplier> maker; + + while ((maker = taskQueue.poll()) != null) { + CompletableFuture future = maker.get(); + if (future != null) { + return future; + } + } + + return null; + } + /** * Sets the start time of the operation and invokes the callback to indicate that the * operation has started. Does nothing if the pipeline has been stopped. @@ -730,15 +817,19 @@ public abstract class OperationPartial implements Operation { * @param callbacks used to determine if the start callback can be invoked * @return a function that sets the start time and invokes the callback */ - private BiConsumer callbackStarted(CallbackManager callbacks) { + protected BiConsumer callbackStarted(CallbackManager callbacks) { return (outcome, thrown) -> { if (callbacks.canStart()) { - // haven't invoked "start" callback yet + outcome.setSubRequestId(getSubRequestId()); outcome.setStart(callbacks.getStartTime()); outcome.setEnd(null); - params.callbackStarted(outcome); + + // pass a copy to the callback + var outcome2 = new OperationOutcome(outcome); + outcome2.setFinalOutcome(false); + params.callbackStarted(outcome2); } }; } @@ -756,14 +847,16 @@ public abstract class OperationPartial implements Operation { * @param callbacks used to determine if the end callback can be invoked * @return a function that sets the end time and invokes the callback */ - private BiConsumer callbackCompleted(CallbackManager callbacks) { + protected BiConsumer callbackCompleted(CallbackManager callbacks) { return (outcome, thrown) -> { - if (callbacks.canEnd()) { + outcome.setSubRequestId(getSubRequestId()); outcome.setStart(callbacks.getStartTime()); outcome.setEnd(callbacks.getEndTime()); - params.callbackCompleted(outcome); + + // pass a copy to the callback + params.callbackCompleted(new OperationOutcome(outcome)); } }; } @@ -774,8 +867,9 @@ public abstract class OperationPartial implements Operation { * @param operation operation to be updated * @return the updated operation */ - protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) { - PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION); + public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) { + OperationResult result = (isTimeout(thrown) ? OperationResult.FAILURE_TIMEOUT + : OperationResult.FAILURE_EXCEPTION); return setOutcome(operation, result); } @@ -786,15 +880,27 @@ public abstract class OperationPartial implements Operation { * @param result result of the operation * @return the updated operation */ - public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) { + public OperationOutcome setOutcome(OperationOutcome operation, OperationResult result) { logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId()); operation.setResult(result); - operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG + operation.setMessage(result == OperationResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG : ControlLoopOperation.FAILED_MSG); return operation; } + /** + * Makes an outcome, populating the "target" field with the contents of the target + * entity property. + * + * @return a new operation outcome + */ + protected OperationOutcome makeOutcome() { + OperationOutcome outcome = params.makeOutcome(); + outcome.setTarget(getProperty(OperationProperties.AAI_TARGET_ENTITY)); + return outcome; + } + /** * Determines if a throwable is due to a timeout. * @@ -809,6 +915,54 @@ public abstract class OperationPartial implements Operation { return (thrown instanceof TimeoutException); } + /** + * Logs a message. If the message is not of type, String, then it attempts to + * pretty-print it into JSON before logging. + * + * @param direction IN or OUT + * @param infra communication infrastructure on which it was published + * @param source source name (e.g., the URL or Topic name) + * @param message message to be logged + * @return the JSON text that was logged + */ + public String logMessage(EventType direction, CommInfrastructure infra, String source, T message) { + String json; + try { + json = prettyPrint(message); + + } catch (IllegalArgumentException e) { + String type = (direction == EventType.IN ? "response" : "request"); + logger.warn("cannot pretty-print {}", type, e); + json = message.toString(); + } + + logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json); + + return json; + } + + /** + * Converts a message to a "pretty-printed" String using the operation's normal + * serialization provider (i.e., it's coder). + * + * @param message response to be logged + * @return the JSON text that was logged + * @throws IllegalArgumentException if the message cannot be converted + */ + public String prettyPrint(T message) { + if (message == null) { + return null; + } else if (message instanceof String) { + return message.toString(); + } else { + try { + return getCoder().encode(message, true); + } catch (CoderException e) { + throw new IllegalArgumentException("cannot encode message", e); + } + } + } + // these may be overridden by subclasses or junit tests /** @@ -841,4 +995,10 @@ public abstract class OperationPartial implements Operation { protected long getTimeoutMs(Integer timeoutSec) { return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS)); } + + // these may be overridden by junit tests + + protected Coder getCoder() { + return coder; + } }