X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=models-interactions%2Fmodel-actors%2FactorServiceProvider%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fpolicy%2Fcontrolloop%2Factorserviceprovider%2Fimpl%2FOperationPartial.java;h=e9f6b024c6ae33c2860dbb35fa0d325fc69361d1;hb=19ef8b24a98c09a349e6ae7309f535a0135463f6;hp=0b349719757601ea3990cb2fb33ff8d8d3515bff;hpb=88fb2e33c81fa0d69846e8d0d218b0ef4015a4ba;p=policy%2Fmodels.git diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java index 0b3497197..e9f6b024c 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java @@ -23,9 +23,13 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -35,6 +39,9 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.function.UnaryOperator; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.utils.NetLoggerUtil; import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType; @@ -45,9 +52,11 @@ import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.actorserviceprovider.CallbackManager; import org.onap.policy.controlloop.actorserviceprovider.Operation; import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.OperationProperties; +import org.onap.policy.controlloop.actorserviceprovider.OperationResult; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig; import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; -import org.onap.policy.controlloop.policy.PolicyResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,176 +64,128 @@ import org.slf4j.LoggerFactory; * Partial implementation of an operator. In general, it's preferable that subclasses * would override {@link #startOperationAsync(int, OperationOutcome) * startOperationAsync()}. However, if that proves to be too difficult, then they can - * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition, - * if the operation requires any preprocessor steps, the subclass may choose to override - * {@link #startPreprocessorAsync()}. + * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. *

* The futures returned by the methods within this class can be canceled, and will * propagate the cancellation to any subtasks. Thus it is also expected that any futures * returned by overridden methods will do the same. Of course, if a class overrides * {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can * be done to cancel that particular operation. + *

+ * In general tasks in a pipeline are executed by the same thread. However, the following + * should always be executed via the executor specified in "params": + *

*/ public abstract class OperationPartial implements Operation { private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class); private static final Coder coder = new StandardCoder(); - public static final long DEFAULT_RETRY_WAIT_MS = 1000L; - // values extracted from the operator + public static final String GUARD_ACTOR_NAME = "GUARD"; + public static final String GUARD_OPERATION_NAME = "Decision"; + public static final long DEFAULT_RETRY_WAIT_MS = 1000L; - private final OperatorPartial operator; + private final OperatorConfig config; /** * Operation parameters. */ protected final ControlLoopOperationParams params; + @Getter + private final String fullName; + + @Getter + @Setter(AccessLevel.PROTECTED) + private String subRequestId; + + @Getter + private final List propertyNames; + + /** + * Values for the properties identified by {@link #getPropertyNames()}. + */ + private final Map properties = new HashMap<>(); + /** * Constructs the object. * * @param params operation parameters - * @param operator operator that created this operation + * @param config configuration for this operation + * @param propertyNames names of properties required by this operation */ - public OperationPartial(ControlLoopOperationParams params, OperatorPartial operator) { + public OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List propertyNames) { this.params = params; - this.operator = operator; + this.config = config; + this.fullName = params.getActor() + "." + params.getOperation(); + this.propertyNames = propertyNames; } public Executor getBlockingExecutor() { - return operator.getBlockingExecutor(); - } - - public String getFullName() { - return operator.getFullName(); + return config.getBlockingExecutor(); } + @Override public String getActorName() { - return operator.getActorName(); + return params.getActor(); } + @Override public String getName() { - return operator.getName(); + return params.getOperation(); } @Override - public final CompletableFuture start() { - if (!operator.isAlive()) { - throw new IllegalStateException("operation is not running: " + getFullName()); - } - - // allocate a controller for the entire operation - final PipelineControllerFuture controller = new PipelineControllerFuture<>(); - - CompletableFuture preproc = startPreprocessorAsync(); - if (preproc == null) { - // no preprocessor required - just start the operation - return startOperationAttempt(controller, 1); - } + public boolean containsProperty(String name) { + return properties.containsKey(name); + } - /* - * Do preprocessor first and then, if successful, start the operation. Note: - * operations create their own outcome, ignoring the outcome from any previous - * steps. - * - * Wrap the preprocessor to ensure "stop" is propagated to it. - */ - // @formatter:off - controller.wrap(preproc) - .exceptionally(fromException("preprocessor of operation")) - .thenCompose(handlePreprocessorFailure(controller)) - .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1)) - .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); - // @formatter:on + @Override + public void setProperty(String name, Object value) { + logger.info("{}: set property {}={}", getFullName(), name, value); + properties.put(name, value); + } - return controller; + @SuppressWarnings("unchecked") + @Override + public T getProperty(String name) { + return (T) properties.get(name); } /** - * Handles a failure in the preprocessor pipeline. If a failure occurred, then it - * invokes the call-backs, marks the controller complete, and returns an incomplete - * future, effectively halting the pipeline. Otherwise, it returns the outcome that it - * received. - *

- * Assumes that no callbacks have been invoked yet. + * Gets a property value, throwing an exception if it's missing. * - * @param controller pipeline controller - * @return a function that checks the outcome status and continues, if successful, or - * indicates a failure otherwise + * @param name property name + * @param propertyType property type, used in an error message if the property value + * is {@code null} + * @return the property value */ - private Function> handlePreprocessorFailure( - PipelineControllerFuture controller) { - - return outcome -> { - - if (outcome != null && isSuccess(outcome)) { - logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId()); - return CompletableFuture.completedFuture(outcome); - } - - logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId()); - - final Executor executor = params.getExecutor(); - final CallbackManager callbacks = new CallbackManager(); - - // propagate "stop" to the callbacks - controller.add(callbacks); - - final OperationOutcome outcome2 = params.makeOutcome(); - - // TODO need a FAILURE_MISSING_DATA (e.g., A&AI) - - outcome2.setResult(PolicyResult.FAILURE_GUARD); - outcome2.setMessage(outcome != null ? outcome.getMessage() : null); - - // @formatter:off - CompletableFuture.completedFuture(outcome2) - .whenCompleteAsync(callbackStarted(callbacks), executor) - .whenCompleteAsync(callbackCompleted(callbacks), executor) - .whenCompleteAsync(controller.delayedComplete(), executor); - // @formatter:on + @SuppressWarnings("unchecked") + protected T getRequiredProperty(String name, String propertyType) { + T value = (T) properties.get(name); + if (value == null) { + throw new IllegalStateException("missing " + propertyType); + } - return new CompletableFuture<>(); - }; + return value; } - /** - * Invokes the operation's preprocessor step(s) as a "future". This method simply - * invokes {@link #startGuardAsync()}. - *

- * This method assumes the following: - *

    - *
  • the operator is alive
  • - *
  • exceptions generated within the pipeline will be handled by the invoker
  • - *
- * - * @return a function that will start the preprocessor and returns its outcome, or - * {@code null} if this operation needs no preprocessor - */ - protected CompletableFuture startPreprocessorAsync() { - return startGuardAsync(); - } + @Override + public CompletableFuture start() { + // allocate a controller for the entire operation + final PipelineControllerFuture controller = new PipelineControllerFuture<>(); - /** - * Invokes the operation's guard step(s) as a "future". This method simply returns - * {@code null}. - *

- * This method assumes the following: - *

    - *
  • the operator is alive
  • - *
  • exceptions generated within the pipeline will be handled by the invoker
  • - *
- * - * @return a function that will start the guard checks and returns its outcome, or - * {@code null} if this operation has no guard - */ - protected CompletableFuture startGuardAsync() { - return null; + // start attempt #1 + return startOperationAttempt(controller, 1); } /** - * Starts the operation attempt, with no preprocessor. When all retries complete, it - * will complete the controller. + * Starts the operation attempt. When all retries complete, it will complete the + * controller. * * @param controller controller for all operation attempts * @param attempt attempt number, typically starting with 1 @@ -233,6 +194,8 @@ public abstract class OperationPartial implements Operation { private CompletableFuture startOperationAttempt( PipelineControllerFuture controller, int attempt) { + generateSubRequestId(attempt); + // propagate "stop" to the operation attempt controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt)) .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); @@ -240,6 +203,17 @@ public abstract class OperationPartial implements Operation { return controller; } + /** + * Generates and sets {@link #subRequestId} to a new subrequest ID. + * + * @param attempt attempt number, typically starting with 1 + */ + public void generateSubRequestId(int attempt) { + // Note: this should be "protected", but that makes junits much messier + + setSubRequestId(UUID.randomUUID().toString()); + } + /** * Starts the operation attempt, without doing any retries. * @@ -252,7 +226,7 @@ public abstract class OperationPartial implements Operation { logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId()); final Executor executor = params.getExecutor(); - final OperationOutcome outcome = params.makeOutcome(); + final OperationOutcome outcome = params.makeOutcome(getTargetEntity()); final CallbackManager callbacks = new CallbackManager(); // this operation attempt gets its own controller @@ -300,7 +274,7 @@ public abstract class OperationPartial implements Operation { * @return {@code true} if the outcome was successful */ protected boolean isSuccess(OperationOutcome outcome) { - return (outcome.getResult() == PolicyResult.SUCCESS); + return (outcome != null && outcome.getResult() == OperationResult.SUCCESS); } /** @@ -311,7 +285,7 @@ public abstract class OperationPartial implements Operation { * and was associated with this operator, {@code false} otherwise */ protected boolean isActorFailed(OperationOutcome outcome) { - return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE); + return (isSameOperation(outcome) && outcome.getResult() == OperationResult.FAILURE); } /** @@ -376,35 +350,47 @@ public abstract class OperationPartial implements Operation { */ private Function setRetryFlag(int attempt) { - return operation -> { - if (operation != null && !isActorFailed(operation)) { - /* - * wrong type or wrong operation - just leave it as is. No need to log - * anything here, as retryOnFailure() will log a message - */ - return operation; + return origOutcome -> { + // ensure we have a non-null outcome + OperationOutcome outcome; + if (origOutcome != null) { + outcome = origOutcome; + } else { + logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId()); + outcome = this.setOutcome(params.makeOutcome(getTargetEntity()), OperationResult.FAILURE); } - // get a non-null operation - OperationOutcome oper2; - if (operation != null) { - oper2 = operation; - } else { - oper2 = params.makeOutcome(); - oper2.setResult(PolicyResult.FAILURE); + // ensure correct actor/operation + outcome.setActor(getActorName()); + outcome.setOperation(getName()); + + // determine if we should retry, based on the result + if (outcome.getResult() != OperationResult.FAILURE) { + // do not retry success or other failure types (e.g., exception) + outcome.setFinalOutcome(true); + return outcome; } int retry = getRetry(params.getRetry()); - if (retry > 0 && attempt > retry) { + if (retry <= 0) { + // no retries were specified + outcome.setFinalOutcome(true); + + } else if (attempt <= retry) { + // have more retries - not the final outcome + outcome.setFinalOutcome(false); + + } else { /* * retries were specified and we've already tried them all - change to * FAILURE_RETRIES */ logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId()); - oper2.setResult(PolicyResult.FAILURE_RETRIES); + outcome.setResult(OperationResult.FAILURE_RETRIES); + outcome.setFinalOutcome(true); } - return oper2; + return outcome; }; } @@ -470,10 +456,16 @@ public abstract class OperationPartial implements Operation { private Function fromException(String type) { return thrown -> { - OperationOutcome outcome = params.makeOutcome(); + OperationOutcome outcome = params.makeOutcome(getTargetEntity()); - logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(), - params.getRequestId(), thrown); + if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) { + // do not include exception in the message, as it just clutters the log + logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(), + params.getRequestId()); + } else { + logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(), + params.getRequestId(), thrown); + } return setOutcome(outcome, thrown); }; @@ -491,7 +483,7 @@ public abstract class OperationPartial implements Operation { * created. If this future is canceled, then all of the futures will be * canceled */ - protected CompletableFuture anyOf( + public CompletableFuture anyOf( @SuppressWarnings("unchecked") Supplier>... futureMakers) { return anyOf(Arrays.asList(futureMakers)); @@ -510,8 +502,7 @@ public abstract class OperationPartial implements Operation { * canceled. Similarly, when this future completes, any incomplete futures * will be canceled */ - protected CompletableFuture anyOf( - List>> futureMakers) { + public CompletableFuture anyOf(List>> futureMakers) { PipelineControllerFuture controller = new PipelineControllerFuture<>(); @@ -544,7 +535,7 @@ public abstract class OperationPartial implements Operation { * created. If this future is canceled, then all of the futures will be * canceled */ - protected CompletableFuture allOf( + public CompletableFuture allOf( @SuppressWarnings("unchecked") Supplier>... futureMakers) { return allOf(Arrays.asList(futureMakers)); @@ -562,8 +553,7 @@ public abstract class OperationPartial implements Operation { * canceled. Similarly, when this future completes, any incomplete futures * will be canceled */ - protected CompletableFuture allOf( - List>> futureMakers) { + public CompletableFuture allOf(List>> futureMakers) { PipelineControllerFuture controller = new PipelineControllerFuture<>(); Queue outcomes = new LinkedList<>(); @@ -721,7 +711,7 @@ public abstract class OperationPartial implements Operation { * @param futureMakers functions to make the futures * @return a future to cancel the sequence or await the outcome */ - protected CompletableFuture sequence( + public CompletableFuture sequence( @SuppressWarnings("unchecked") Supplier>... futureMakers) { return sequence(Arrays.asList(futureMakers)); @@ -736,7 +726,7 @@ public abstract class OperationPartial implements Operation { * @return a future to cancel the sequence or await the outcome, or {@code null} if * there were no tasks to perform */ - protected CompletableFuture sequence( + public CompletableFuture sequence( List>> futureMakers) { Queue>> queue = new ArrayDeque<>(futureMakers); @@ -761,7 +751,7 @@ public abstract class OperationPartial implements Operation { // @formatter:off controller.wrap(nextTask) - .thenComposeAsync(nextTaskOnSuccess(controller, queue), executor) + .thenCompose(nextTaskOnSuccess(controller, queue)) .whenCompleteAsync(controller.delayedComplete(), executor); // @formatter:on @@ -795,7 +785,7 @@ public abstract class OperationPartial implements Operation { // @formatter:off return controller .wrap(nextTask) - .thenComposeAsync(nextTaskOnSuccess(controller, taskQueue), params.getExecutor()); + .thenCompose(nextTaskOnSuccess(controller, taskQueue)); // @formatter:on }; } @@ -830,15 +820,19 @@ public abstract class OperationPartial implements Operation { * @param callbacks used to determine if the start callback can be invoked * @return a function that sets the start time and invokes the callback */ - private BiConsumer callbackStarted(CallbackManager callbacks) { + protected BiConsumer callbackStarted(CallbackManager callbacks) { return (outcome, thrown) -> { if (callbacks.canStart()) { - // haven't invoked "start" callback yet + outcome.setSubRequestId(getSubRequestId()); outcome.setStart(callbacks.getStartTime()); outcome.setEnd(null); - params.callbackStarted(outcome); + + // pass a copy to the callback + OperationOutcome outcome2 = new OperationOutcome(outcome); + outcome2.setFinalOutcome(false); + params.callbackStarted(outcome2); } }; } @@ -856,14 +850,16 @@ public abstract class OperationPartial implements Operation { * @param callbacks used to determine if the end callback can be invoked * @return a function that sets the end time and invokes the callback */ - private BiConsumer callbackCompleted(CallbackManager callbacks) { + protected BiConsumer callbackCompleted(CallbackManager callbacks) { return (outcome, thrown) -> { - if (callbacks.canEnd()) { + outcome.setSubRequestId(getSubRequestId()); outcome.setStart(callbacks.getStartTime()); outcome.setEnd(callbacks.getEndTime()); - params.callbackCompleted(outcome); + + // pass a copy to the callback + params.callbackCompleted(new OperationOutcome(outcome)); } }; } @@ -874,8 +870,9 @@ public abstract class OperationPartial implements Operation { * @param operation operation to be updated * @return the updated operation */ - protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) { - PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION); + public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) { + OperationResult result = (isTimeout(thrown) ? OperationResult.FAILURE_TIMEOUT + : OperationResult.FAILURE_EXCEPTION); return setOutcome(operation, result); } @@ -886,10 +883,10 @@ public abstract class OperationPartial implements Operation { * @param result result of the operation * @return the updated operation */ - public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) { + public OperationOutcome setOutcome(OperationOutcome operation, OperationResult result) { logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId()); operation.setResult(result); - operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG + operation.setMessage(result == OperationResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG : ControlLoopOperation.FAILED_MSG); return operation; @@ -910,30 +907,24 @@ public abstract class OperationPartial implements Operation { } /** - * Logs a response. If the response is not of type, String, then it attempts to + * Logs a message. If the message is not of type, String, then it attempts to * pretty-print it into JSON before logging. * * @param direction IN or OUT * @param infra communication infrastructure on which it was published * @param source source name (e.g., the URL or Topic name) - * @param response response to be logged + * @param message message to be logged * @return the JSON text that was logged */ - public String logMessage(EventType direction, CommInfrastructure infra, String source, T response) { + public String logMessage(EventType direction, CommInfrastructure infra, String source, T message) { String json; try { - if (response == null) { - json = null; - } else if (response instanceof String) { - json = response.toString(); - } else { - json = makeCoder().encode(response, true); - } + json = prettyPrint(message); - } catch (CoderException e) { + } catch (IllegalArgumentException e) { String type = (direction == EventType.IN ? "response" : "request"); logger.warn("cannot pretty-print {}", type, e); - json = response.toString(); + json = message.toString(); } logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json); @@ -941,6 +932,28 @@ public abstract class OperationPartial implements Operation { return json; } + /** + * Converts a message to a "pretty-printed" String using the operation's normal + * serialization provider (i.e., it's coder). + * + * @param message response to be logged + * @return the JSON text that was logged + * @throws IllegalArgumentException if the message cannot be converted + */ + public String prettyPrint(T message) { + if (message == null) { + return null; + } else if (message instanceof String) { + return message.toString(); + } else { + try { + return getCoder().encode(message, true); + } catch (CoderException e) { + throw new IllegalArgumentException("cannot encode message", e); + } + } + } + // these may be overridden by subclasses or junit tests /** @@ -962,6 +975,16 @@ public abstract class OperationPartial implements Operation { return DEFAULT_RETRY_WAIT_MS; } + /** + * Gets the target entity, first trying the properties and then the parameters. + * + * @return the target entity + */ + protected String getTargetEntity() { + String targetEntity = getProperty(OperationProperties.AAI_TARGET_ENTITY); + return (targetEntity != null ? targetEntity : params.getTargetEntity()); + } + /** * Gets the operation timeout. * @@ -976,7 +999,7 @@ public abstract class OperationPartial implements Operation { // these may be overridden by junit tests - protected Coder makeCoder() { + protected Coder getCoder() { return coder; } }