X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=models-interactions%2Fmodel-actors%2FactorServiceProvider%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fpolicy%2Fcontrolloop%2Factorserviceprovider%2Fimpl%2FOperationPartial.java;h=c19ad6c319c9d5de425b1f2b710430ec14d0ab6f;hb=49f07db935d114b72a44e446867b16262dd552aa;hp=d00b88bb5ff2c6cbd3ae437608ea941bb6fd5488;hpb=ad1cd2013f45da5764fc9610db1f679d3c3762cb;p=policy%2Fmodels.git
diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java
index d00b88bb5..c19ad6c31 100644
--- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java
+++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2020-2022 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,7 +20,16 @@
package org.onap.policy.controlloop.actorserviceprovider.impl;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
@@ -28,13 +37,26 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.controlloop.ControlLoopOperation;
import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
import org.onap.policy.controlloop.actorserviceprovider.Operation;
import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
+import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
-import org.onap.policy.controlloop.policy.PolicyResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,176 +64,128 @@ import org.slf4j.LoggerFactory;
* Partial implementation of an operator. In general, it's preferable that subclasses
* would override {@link #startOperationAsync(int, OperationOutcome)
* startOperationAsync()}. However, if that proves to be too difficult, then they can
- * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
- * if the operation requires any preprocessor steps, the subclass may choose to override
- * {@link #startPreprocessorAsync()}.
+ * simply override {@link #doOperation(int, OperationOutcome) doOperation()}.
*
* The futures returned by the methods within this class can be canceled, and will
* propagate the cancellation to any subtasks. Thus it is also expected that any futures
* returned by overridden methods will do the same. Of course, if a class overrides
* {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
* be done to cancel that particular operation.
+ *
+ * In general tasks in a pipeline are executed by the same thread. However, the following
+ * should always be executed via the executor specified in "params":
+ *
+ * - start callback
+ * - completion callback
+ * - controller completion (i.e., delayedComplete())
+ *
*/
public abstract class OperationPartial implements Operation {
-
private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
- public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
+ private static final Coder coder = new StandardCoder();
- // values extracted from the operator
+ public static final String GUARD_ACTOR_NAME = "GUARD";
+ public static final String GUARD_OPERATION_NAME = "Decision";
+ public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
- private final OperatorPartial operator;
+ private final OperatorConfig config;
/**
* Operation parameters.
*/
protected final ControlLoopOperationParams params;
+ @Getter
+ private final String fullName;
+
+ @Getter
+ @Setter(AccessLevel.PROTECTED)
+ private String subRequestId;
+
+ @Getter
+ private final List propertyNames;
+
+ /**
+ * Values for the properties identified by {@link #getPropertyNames()}.
+ */
+ private final Map properties = new HashMap<>();
+
/**
* Constructs the object.
*
* @param params operation parameters
- * @param operator operator that created this operation
+ * @param config configuration for this operation
+ * @param propertyNames names of properties required by this operation
*/
- public OperationPartial(ControlLoopOperationParams params, OperatorPartial operator) {
+ protected OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List propertyNames) {
this.params = params;
- this.operator = operator;
+ this.config = config;
+ this.fullName = params.getActor() + "." + params.getOperation();
+ this.propertyNames = propertyNames;
}
public Executor getBlockingExecutor() {
- return operator.getBlockingExecutor();
- }
-
- public String getFullName() {
- return operator.getFullName();
+ return config.getBlockingExecutor();
}
+ @Override
public String getActorName() {
- return operator.getActorName();
+ return params.getActor();
}
+ @Override
public String getName() {
- return operator.getName();
+ return params.getOperation();
}
@Override
- public final CompletableFuture start() {
- if (!operator.isAlive()) {
- throw new IllegalStateException("operation is not running: " + getFullName());
- }
-
- // allocate a controller for the entire operation
- final PipelineControllerFuture controller = new PipelineControllerFuture<>();
-
- CompletableFuture preproc = startPreprocessorAsync();
- if (preproc == null) {
- // no preprocessor required - just start the operation
- return startOperationAttempt(controller, 1);
- }
+ public boolean containsProperty(String name) {
+ return properties.containsKey(name);
+ }
- /*
- * Do preprocessor first and then, if successful, start the operation. Note:
- * operations create their own outcome, ignoring the outcome from any previous
- * steps.
- *
- * Wrap the preprocessor to ensure "stop" is propagated to it.
- */
- // @formatter:off
- controller.wrap(preproc)
- .exceptionally(fromException("preprocessor of operation"))
- .thenCompose(handlePreprocessorFailure(controller))
- .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
- .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
- // @formatter:on
+ @Override
+ public void setProperty(String name, Object value) {
+ logger.info("{}: set property {}={}", getFullName(), name, value);
+ properties.put(name, value);
+ }
- return controller;
+ @SuppressWarnings("unchecked")
+ @Override
+ public T getProperty(String name) {
+ return (T) properties.get(name);
}
/**
- * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
- * invokes the call-backs, marks the controller complete, and returns an incomplete
- * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
- * received.
- *
- * Assumes that no callbacks have been invoked yet.
+ * Gets a property value, throwing an exception if it's missing.
*
- * @param controller pipeline controller
- * @return a function that checks the outcome status and continues, if successful, or
- * indicates a failure otherwise
+ * @param name property name
+ * @param propertyType property type, used in an error message if the property value
+ * is {@code null}
+ * @return the property value
*/
- private Function> handlePreprocessorFailure(
- PipelineControllerFuture controller) {
-
- return outcome -> {
-
- if (outcome != null && isSuccess(outcome)) {
- logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
- return CompletableFuture.completedFuture(outcome);
- }
-
- logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
-
- final Executor executor = params.getExecutor();
- final CallbackManager callbacks = new CallbackManager();
-
- // propagate "stop" to the callbacks
- controller.add(callbacks);
-
- final OperationOutcome outcome2 = params.makeOutcome();
-
- // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
-
- outcome2.setResult(PolicyResult.FAILURE_GUARD);
- outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
-
- // @formatter:off
- CompletableFuture.completedFuture(outcome2)
- .whenCompleteAsync(callbackStarted(callbacks), executor)
- .whenCompleteAsync(callbackCompleted(callbacks), executor)
- .whenCompleteAsync(controller.delayedComplete(), executor);
- // @formatter:on
+ @SuppressWarnings("unchecked")
+ public T getRequiredProperty(String name, String propertyType) {
+ T value = (T) properties.get(name);
+ if (value == null) {
+ throw new IllegalStateException("missing " + propertyType);
+ }
- return new CompletableFuture<>();
- };
+ return value;
}
- /**
- * Invokes the operation's preprocessor step(s) as a "future". This method simply
- * invokes {@link #startGuardAsync()}.
- *
- * This method assumes the following:
- *
- * - the operator is alive
- * - exceptions generated within the pipeline will be handled by the invoker
- *
- *
- * @return a function that will start the preprocessor and returns its outcome, or
- * {@code null} if this operation needs no preprocessor
- */
- protected CompletableFuture startPreprocessorAsync() {
- return startGuardAsync();
- }
+ @Override
+ public CompletableFuture start() {
+ // allocate a controller for the entire operation
+ final PipelineControllerFuture controller = new PipelineControllerFuture<>();
- /**
- * Invokes the operation's guard step(s) as a "future". This method simply returns
- * {@code null}.
- *
- * This method assumes the following:
- *
- * - the operator is alive
- * - exceptions generated within the pipeline will be handled by the invoker
- *
- *
- * @return a function that will start the guard checks and returns its outcome, or
- * {@code null} if this operation has no guard
- */
- protected CompletableFuture startGuardAsync() {
- return null;
+ // start attempt #1
+ return startOperationAttempt(controller, 1);
}
/**
- * Starts the operation attempt, with no preprocessor. When all retries complete, it
- * will complete the controller.
+ * Starts the operation attempt. When all retries complete, it will complete the
+ * controller.
*
* @param controller controller for all operation attempts
* @param attempt attempt number, typically starting with 1
@@ -220,6 +194,8 @@ public abstract class OperationPartial implements Operation {
private CompletableFuture startOperationAttempt(
PipelineControllerFuture controller, int attempt) {
+ generateSubRequestId(attempt);
+
// propagate "stop" to the operation attempt
controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
.whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
@@ -227,10 +203,20 @@ public abstract class OperationPartial implements Operation {
return controller;
}
+ /**
+ * Generates and sets {@link #subRequestId} to a new subrequest ID.
+ *
+ * @param attempt attempt number, typically starting with 1
+ */
+ public void generateSubRequestId(int attempt) {
+ // Note: this should be "protected", but that makes junits much messier
+
+ setSubRequestId(UUID.randomUUID().toString());
+ }
+
/**
* Starts the operation attempt, without doing any retries.
*
- * @param params operation parameters
* @param attempt attempt number, typically starting with 1
* @return a future that will return the result of a single operation attempt
*/
@@ -238,9 +224,9 @@ public abstract class OperationPartial implements Operation {
logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
- final Executor executor = params.getExecutor();
- final OperationOutcome outcome = params.makeOutcome();
- final CallbackManager callbacks = new CallbackManager();
+ final var executor = params.getExecutor();
+ final var outcome = makeOutcome();
+ final var callbacks = new CallbackManager();
// this operation attempt gets its own controller
final PipelineControllerFuture controller = new PipelineControllerFuture<>();
@@ -287,7 +273,7 @@ public abstract class OperationPartial implements Operation {
* @return {@code true} if the outcome was successful
*/
protected boolean isSuccess(OperationOutcome outcome) {
- return (outcome.getResult() == PolicyResult.SUCCESS);
+ return (outcome != null && outcome.getResult() == OperationResult.SUCCESS);
}
/**
@@ -298,7 +284,7 @@ public abstract class OperationPartial implements Operation {
* and was associated with this operator, {@code false} otherwise
*/
protected boolean isActorFailed(OperationOutcome outcome) {
- return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
+ return (isSameOperation(outcome) && outcome.getResult() == OperationResult.FAILURE);
}
/**
@@ -363,35 +349,47 @@ public abstract class OperationPartial implements Operation {
*/
private Function setRetryFlag(int attempt) {
- return operation -> {
- if (operation != null && !isActorFailed(operation)) {
- /*
- * wrong type or wrong operation - just leave it as is. No need to log
- * anything here, as retryOnFailure() will log a message
- */
- return operation;
+ return origOutcome -> {
+ // ensure we have a non-null outcome
+ OperationOutcome outcome;
+ if (origOutcome != null) {
+ outcome = origOutcome;
+ } else {
+ logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
+ outcome = this.setOutcome(makeOutcome(), OperationResult.FAILURE);
}
- // get a non-null operation
- OperationOutcome oper2;
- if (operation != null) {
- oper2 = operation;
- } else {
- oper2 = params.makeOutcome();
- oper2.setResult(PolicyResult.FAILURE);
+ // ensure correct actor/operation
+ outcome.setActor(getActorName());
+ outcome.setOperation(getName());
+
+ // determine if we should retry, based on the result
+ if (outcome.getResult() != OperationResult.FAILURE) {
+ // do not retry success or other failure types (e.g., exception)
+ outcome.setFinalOutcome(true);
+ return outcome;
}
int retry = getRetry(params.getRetry());
- if (retry > 0 && attempt > retry) {
+ if (retry <= 0) {
+ // no retries were specified
+ outcome.setFinalOutcome(true);
+
+ } else if (attempt <= retry) {
+ // have more retries - not the final outcome
+ outcome.setFinalOutcome(false);
+
+ } else {
/*
- * retries were specified and we've already tried them all - change to
+ * retries were specified, and we've already tried them all - change to
* FAILURE_RETRIES
*/
logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
- oper2.setResult(PolicyResult.FAILURE_RETRIES);
+ outcome.setResult(OperationResult.FAILURE_RETRIES);
+ outcome.setFinalOutcome(true);
}
- return oper2;
+ return outcome;
};
}
@@ -457,10 +455,16 @@ public abstract class OperationPartial implements Operation {
private Function fromException(String type) {
return thrown -> {
- OperationOutcome outcome = params.makeOutcome();
+ OperationOutcome outcome = makeOutcome();
- logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
- params.getRequestId(), thrown);
+ if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
+ // do not include exception in the message, as it just clutters the log
+ logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
+ params.getRequestId());
+ } else {
+ logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
+ params.getRequestId(), thrown);
+ }
return setOutcome(outcome, thrown);
};
@@ -470,103 +474,108 @@ public abstract class OperationPartial implements Operation {
* Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
* any outstanding futures when one completes.
*
- * @param futures futures for which to wait
- * @return a future to cancel or await an outcome. If this future is canceled, then
- * all of the futures will be canceled
+ * @param futureMakers function to make a future. If the function returns
+ * {@code null}, then no future is created for that function. On the other
+ * hand, if the function throws an exception, then the previously created
+ * functions are canceled and the exception is re-thrown
+ * @return a future to cancel or await an outcome, or {@code null} if no futures were
+ * created. If this future is canceled, then all of the futures will be
+ * canceled
*/
- protected CompletableFuture anyOf(List> futures) {
-
- // convert list to an array
- @SuppressWarnings("rawtypes")
- CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+ public CompletableFuture anyOf(
+ @SuppressWarnings("unchecked") Supplier>... futureMakers) {
- @SuppressWarnings("unchecked")
- CompletableFuture result = anyOf(arrFutures);
- return result;
+ return anyOf(Arrays.asList(futureMakers));
}
/**
- * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
- * outstanding futures when one completes.
+ * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
+ * any outstanding futures when one completes.
*
- * @param futures futures for which to wait
- * @return a future to cancel or await an outcome. If this future is canceled, then
- * all of the futures will be canceled
+ * @param futureMakers function to make a future. If the function returns
+ * {@code null}, then no future is created for that function. On the other
+ * hand, if the function throws an exception, then the previously created
+ * functions are canceled and the exception is re-thrown
+ * @return a future to cancel or await an outcome, or {@code null} if no futures were
+ * created. If this future is canceled, then all of the futures will be
+ * canceled. Similarly, when this future completes, any incomplete futures
+ * will be canceled
*/
- protected CompletableFuture anyOf(
- @SuppressWarnings("unchecked") CompletableFuture... futures) {
+ public CompletableFuture anyOf(List>> futureMakers) {
+
+ PipelineControllerFuture controller = new PipelineControllerFuture<>();
+
+ CompletableFuture[] futures =
+ attachFutures(controller, futureMakers, UnaryOperator.identity());
+
+ if (futures.length == 0) {
+ // no futures were started
+ return null;
+ }
if (futures.length == 1) {
return futures[0];
}
- final Executor executor = params.getExecutor();
- final PipelineControllerFuture controller = new PipelineControllerFuture<>();
-
- attachFutures(controller, futures);
-
- // @formatter:off
- CompletableFuture.anyOf(futures)
- .thenApply(object -> (OperationOutcome) object)
- .whenCompleteAsync(controller.delayedComplete(), executor);
- // @formatter:on
+ CompletableFuture.anyOf(futures).thenApply(OperationOutcome.class::cast)
+ .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
return controller;
}
/**
- * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
- * the futures if returned future is canceled. The future returns the "worst" outcome,
- * based on priority (see {@link #detmPriority(OperationOutcome)}).
+ * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
*
- * @param futures futures for which to wait
- * @return a future to cancel or await an outcome. If this future is canceled, then
- * all of the futures will be canceled
+ * @param futureMakers function to make a future. If the function returns
+ * {@code null}, then no future is created for that function. On the other
+ * hand, if the function throws an exception, then the previously created
+ * functions are canceled and the exception is re-thrown
+ * @return a future to cancel or await an outcome, or {@code null} if no futures were
+ * created. If this future is canceled, then all of the futures will be
+ * canceled
*/
- protected CompletableFuture allOf(List> futures) {
-
- // convert list to an array
- @SuppressWarnings("rawtypes")
- CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+ public CompletableFuture allOf(
+ @SuppressWarnings("unchecked") Supplier>... futureMakers) {
- @SuppressWarnings("unchecked")
- CompletableFuture result = allOf(arrFutures);
- return result;
+ return allOf(Arrays.asList(futureMakers));
}
/**
- * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
- * futures if returned future is canceled. The future returns the "worst" outcome,
- * based on priority (see {@link #detmPriority(OperationOutcome)}).
+ * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
*
- * @param futures futures for which to wait
- * @return a future to cancel or await an outcome. If this future is canceled, then
- * all of the futures will be canceled
+ * @param futureMakers function to make a future. If the function returns
+ * {@code null}, then no future is created for that function. On the other
+ * hand, if the function throws an exception, then the previously created
+ * functions are canceled and the exception is re-thrown
+ * @return a future to cancel or await an outcome, or {@code null} if no futures were
+ * created. If this future is canceled, then all of the futures will be
+ * canceled. Similarly, when this future completes, any incomplete futures
+ * will be canceled
*/
- protected CompletableFuture allOf(
- @SuppressWarnings("unchecked") CompletableFuture... futures) {
-
- if (futures.length == 1) {
- return futures[0];
- }
-
- final PipelineControllerFuture controller = new PipelineControllerFuture<>();
+ public CompletableFuture allOf(List>> futureMakers) {
+ PipelineControllerFuture controller = new PipelineControllerFuture<>();
- attachFutures(controller, futures);
+ Queue outcomes = new LinkedList<>();
- OperationOutcome[] outcomes = new OperationOutcome[futures.length];
+ CompletableFuture[] futures =
+ attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
+ synchronized (outcomes) {
+ outcomes.add(outcome);
+ }
+ return outcome;
+ }));
- @SuppressWarnings("rawtypes")
- CompletableFuture[] futures2 = new CompletableFuture[futures.length];
+ if (futures.length == 0) {
+ // no futures were started
+ return null;
+ }
- // record the outcomes of each future when it completes
- for (int count = 0; count < futures2.length; ++count) {
- final int count2 = count;
- futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
+ if (futures.length == 1) {
+ return futures[0];
}
// @formatter:off
- CompletableFuture.allOf(futures2)
+ CompletableFuture.allOf(futures)
.thenApply(unused -> combineOutcomes(outcomes))
.whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
// @formatter:on
@@ -575,22 +584,60 @@ public abstract class OperationPartial implements Operation {
}
/**
- * Attaches the given futures to the controller.
+ * Invokes the functions to create the futures and attaches them to the controller.
*
* @param controller master controller for all of the futures
- * @param futures futures to be attached to the controller
- */
- private void attachFutures(PipelineControllerFuture controller,
- @SuppressWarnings("unchecked") CompletableFuture... futures) {
+ * @param futureMakers futures to be attached to the controller
+ * @param adorn function that "adorns" the future, possible adding onto its pipeline.
+ * Returns the adorned future
+ * @return an array of futures, possibly zero-length. If the array is of size one,
+ * then that one item should be returned instead of the controller
+ */
+ @SuppressWarnings("unchecked")
+ private CompletableFuture[] attachFutures(PipelineControllerFuture controller,
+ List>> futureMakers,
+ UnaryOperator> adorn) {
+
+ if (futureMakers.isEmpty()) {
+ return new CompletableFuture[0];
+ }
- if (futures.length == 0) {
- throw new IllegalArgumentException("empty list of futures");
+ // the last, unadorned future that is created
+ CompletableFuture lastFuture = null;
+
+ List> futures = new ArrayList<>(futureMakers.size());
+
+ // make each future
+ for (var maker : futureMakers) {
+ try {
+ CompletableFuture future = maker.get();
+ if (future == null) {
+ continue;
+ }
+
+ // propagate "stop" to the future
+ controller.add(future);
+
+ futures.add(adorn.apply(future));
+
+ lastFuture = future;
+
+ } catch (RuntimeException e) {
+ logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
+ controller.cancel(false);
+ throw e;
+ }
}
- // attach each task
- for (CompletableFuture future : futures) {
- controller.add(future);
+ var result = new CompletableFuture[futures.size()];
+
+ if (result.length == 1) {
+ // special case - return the unadorned future
+ result[0] = lastFuture;
+ return result;
}
+
+ return futures.toArray(result);
}
/**
@@ -599,15 +646,13 @@ public abstract class OperationPartial implements Operation {
* @param outcomes outcomes to be examined
* @return the combined outcome
*/
- private OperationOutcome combineOutcomes(OperationOutcome[] outcomes) {
+ private OperationOutcome combineOutcomes(Queue outcomes) {
// identify the outcome with the highest priority
- OperationOutcome outcome = outcomes[0];
+ OperationOutcome outcome = outcomes.remove();
int priority = detmPriority(outcome);
- // start with "1", as we've already dealt with "0"
- for (int count = 1; count < outcomes.length; ++count) {
- OperationOutcome outcome2 = outcomes[count];
+ for (OperationOutcome outcome2 : outcomes) {
int priority2 = detmPriority(outcome2);
if (priority2 > priority) {
@@ -656,71 +701,113 @@ public abstract class OperationPartial implements Operation {
}
/**
- * Performs a task, after verifying that the controller is still running. Also checks
- * that the previous outcome was successful, if specified.
+ * Performs a sequence of tasks, stopping if a task fails. A given task's future is
+ * not created until the previous task completes. The pipeline returns the outcome of
+ * the last task executed.
*
- * @param controller overall pipeline controller
- * @param checkSuccess {@code true} to check the previous outcome, {@code false}
- * otherwise
- * @param outcome outcome of the previous task
- * @param task task to be performed
- * @return the task, if everything checks out. Otherwise, it returns an incomplete
- * future and completes the controller instead
+ * @param futureMakers functions to make the futures
+ * @return a future to cancel the sequence or await the outcome
*/
- // @formatter:off
- protected CompletableFuture doTask(
- PipelineControllerFuture controller,
- boolean checkSuccess, OperationOutcome outcome,
- CompletableFuture task) {
- // @formatter:on
+ public CompletableFuture sequence(
+ @SuppressWarnings("unchecked") Supplier>... futureMakers) {
- if (checkSuccess && !isSuccess(outcome)) {
- /*
- * must complete before canceling so that cancel() doesn't cause controller to
- * complete
- */
- controller.complete(outcome);
- task.cancel(false);
- return new CompletableFuture<>();
+ return sequence(Arrays.asList(futureMakers));
+ }
+
+ /**
+ * Performs a sequence of tasks, stopping if a task fails. A given task's future is
+ * not created until the previous task completes. The pipeline returns the outcome of
+ * the last task executed.
+ *
+ * @param futureMakers functions to make the futures
+ * @return a future to cancel the sequence or await the outcome, or {@code null} if
+ * there were no tasks to perform
+ */
+ public CompletableFuture sequence(
+ List>> futureMakers) {
+
+ Queue>> queue = new ArrayDeque<>(futureMakers);
+
+ CompletableFuture nextTask = getNextTask(queue);
+ if (nextTask == null) {
+ // no tasks
+ return null;
+ }
+
+ if (queue.isEmpty()) {
+ // only one task - just return it rather than wrapping it in a controller
+ return nextTask;
}
- return controller.wrap(task);
+ /*
+ * multiple tasks - need a controller to stop whichever task is currently
+ * executing
+ */
+ final PipelineControllerFuture controller = new PipelineControllerFuture<>();
+ final var executor = params.getExecutor();
+
+ // @formatter:off
+ controller.wrap(nextTask)
+ .thenCompose(nextTaskOnSuccess(controller, queue))
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
+
+ return controller;
}
/**
- * Performs a task, after verifying that the controller is still running. Also checks
- * that the previous outcome was successful, if specified.
+ * Executes the next task in the queue, if the previous outcome was successful.
*
- * @param controller overall pipeline controller
- * @param checkSuccess {@code true} to check the previous outcome, {@code false}
- * otherwise
- * @param task function to start the task to be performed
- * @return a function to perform the task. If everything checks out, then it returns
- * the task. Otherwise, it returns an incomplete future and completes the
- * controller instead
+ * @param controller pipeline controller
+ * @param taskQueue queue of tasks to be performed
+ * @return a future to execute the remaining tasks, or the current outcome, if it's a
+ * failure, or if there are no more tasks
*/
- // @formatter:off
- protected Function> doTask(
+ private Function> nextTaskOnSuccess(
PipelineControllerFuture controller,
- boolean checkSuccess,
- Function> task) {
- // @formatter:on
+ Queue>> taskQueue) {
return outcome -> {
-
- if (!controller.isRunning()) {
- return new CompletableFuture<>();
+ if (!isSuccess(outcome)) {
+ // return the failure
+ return CompletableFuture.completedFuture(outcome);
}
- if (checkSuccess && !isSuccess(outcome)) {
- controller.complete(outcome);
- return new CompletableFuture<>();
+ CompletableFuture nextTask = getNextTask(taskQueue);
+ if (nextTask == null) {
+ // no tasks - just return the success
+ return CompletableFuture.completedFuture(outcome);
}
- return controller.wrap(task.apply(outcome));
+ // @formatter:off
+ return controller
+ .wrap(nextTask)
+ .thenCompose(nextTaskOnSuccess(controller, taskQueue));
+ // @formatter:on
};
}
+ /**
+ * Gets the next task from the queue, skipping those that are {@code null}.
+ *
+ * @param taskQueue task queue
+ * @return the next task, or {@code null} if the queue is now empty
+ */
+ private CompletableFuture getNextTask(
+ Queue>> taskQueue) {
+
+ Supplier> maker;
+
+ while ((maker = taskQueue.poll()) != null) {
+ CompletableFuture future = maker.get();
+ if (future != null) {
+ return future;
+ }
+ }
+
+ return null;
+ }
+
/**
* Sets the start time of the operation and invokes the callback to indicate that the
* operation has started. Does nothing if the pipeline has been stopped.
@@ -730,15 +817,19 @@ public abstract class OperationPartial implements Operation {
* @param callbacks used to determine if the start callback can be invoked
* @return a function that sets the start time and invokes the callback
*/
- private BiConsumer callbackStarted(CallbackManager callbacks) {
+ protected BiConsumer callbackStarted(CallbackManager callbacks) {
return (outcome, thrown) -> {
if (callbacks.canStart()) {
- // haven't invoked "start" callback yet
+ outcome.setSubRequestId(getSubRequestId());
outcome.setStart(callbacks.getStartTime());
outcome.setEnd(null);
- params.callbackStarted(outcome);
+
+ // pass a copy to the callback
+ var outcome2 = new OperationOutcome(outcome);
+ outcome2.setFinalOutcome(false);
+ params.callbackStarted(outcome2);
}
};
}
@@ -756,14 +847,16 @@ public abstract class OperationPartial implements Operation {
* @param callbacks used to determine if the end callback can be invoked
* @return a function that sets the end time and invokes the callback
*/
- private BiConsumer callbackCompleted(CallbackManager callbacks) {
+ protected BiConsumer callbackCompleted(CallbackManager callbacks) {
return (outcome, thrown) -> {
-
if (callbacks.canEnd()) {
+ outcome.setSubRequestId(getSubRequestId());
outcome.setStart(callbacks.getStartTime());
outcome.setEnd(callbacks.getEndTime());
- params.callbackCompleted(outcome);
+
+ // pass a copy to the callback
+ params.callbackCompleted(new OperationOutcome(outcome));
}
};
}
@@ -774,8 +867,9 @@ public abstract class OperationPartial implements Operation {
* @param operation operation to be updated
* @return the updated operation
*/
- protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
- PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
+ public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
+ OperationResult result = (isTimeout(thrown) ? OperationResult.FAILURE_TIMEOUT
+ : OperationResult.FAILURE_EXCEPTION);
return setOutcome(operation, result);
}
@@ -786,15 +880,27 @@ public abstract class OperationPartial implements Operation {
* @param result result of the operation
* @return the updated operation
*/
- public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
+ public OperationOutcome setOutcome(OperationOutcome operation, OperationResult result) {
logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
operation.setResult(result);
- operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
+ operation.setMessage(result == OperationResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
: ControlLoopOperation.FAILED_MSG);
return operation;
}
+ /**
+ * Makes an outcome, populating the "target" field with the contents of the target
+ * entity property.
+ *
+ * @return a new operation outcome
+ */
+ protected OperationOutcome makeOutcome() {
+ OperationOutcome outcome = params.makeOutcome();
+ outcome.setTarget(getProperty(OperationProperties.AAI_TARGET_ENTITY));
+ return outcome;
+ }
+
/**
* Determines if a throwable is due to a timeout.
*
@@ -809,6 +915,54 @@ public abstract class OperationPartial implements Operation {
return (thrown instanceof TimeoutException);
}
+ /**
+ * Logs a message. If the message is not of type, String, then it attempts to
+ * pretty-print it into JSON before logging.
+ *
+ * @param direction IN or OUT
+ * @param infra communication infrastructure on which it was published
+ * @param source source name (e.g., the URL or Topic name)
+ * @param message message to be logged
+ * @return the JSON text that was logged
+ */
+ public String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
+ String json;
+ try {
+ json = prettyPrint(message);
+
+ } catch (IllegalArgumentException e) {
+ String type = (direction == EventType.IN ? "response" : "request");
+ logger.warn("cannot pretty-print {}", type, e);
+ json = message.toString();
+ }
+
+ logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
+
+ return json;
+ }
+
+ /**
+ * Converts a message to a "pretty-printed" String using the operation's normal
+ * serialization provider (i.e., it's coder).
+ *
+ * @param message response to be logged
+ * @return the JSON text that was logged
+ * @throws IllegalArgumentException if the message cannot be converted
+ */
+ public String prettyPrint(T message) {
+ if (message == null) {
+ return null;
+ } else if (message instanceof String) {
+ return message.toString();
+ } else {
+ try {
+ return getCoder().encode(message, true);
+ } catch (CoderException e) {
+ throw new IllegalArgumentException("cannot encode message", e);
+ }
+ }
+ }
+
// these may be overridden by subclasses or junit tests
/**
@@ -841,4 +995,10 @@ public abstract class OperationPartial implements Operation {
protected long getTimeoutMs(Integer timeoutSec) {
return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
}
+
+ // these may be overridden by junit tests
+
+ protected Coder getCoder() {
+ return coder;
+ }
}