X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=models-interactions%2Fmodel-actors%2FactorServiceProvider%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fpolicy%2Fcontrolloop%2Factorserviceprovider%2Fimpl%2FOperationPartial.java;h=c19ad6c319c9d5de425b1f2b710430ec14d0ab6f;hb=refs%2Fchanges%2F90%2F137190%2F1;hp=1df3187994a78ab19c5b847460ceacd08b8a1ef9;hpb=45e3d3ae1cc5a4c29526da4f4cb6096e1b6d7d4f;p=policy%2Fmodels.git diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java index 1df318799..c19ad6c31 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/impl/OperationPartial.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2020-2022 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ package org.onap.policy.controlloop.actorserviceprovider.impl; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; -import java.util.LinkedHashMap; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -52,10 +52,11 @@ import org.onap.policy.controlloop.ControlLoopOperation; import org.onap.policy.controlloop.actorserviceprovider.CallbackManager; import org.onap.policy.controlloop.actorserviceprovider.Operation; import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.OperationProperties; +import org.onap.policy.controlloop.actorserviceprovider.OperationResult; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig; import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; -import org.onap.policy.controlloop.policy.PolicyResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,9 +64,7 @@ import org.slf4j.LoggerFactory; * Partial implementation of an operator. In general, it's preferable that subclasses * would override {@link #startOperationAsync(int, OperationOutcome) * startOperationAsync()}. However, if that proves to be too difficult, then they can - * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition, - * if the operation requires any preprocessor steps, the subclass may choose to override - * {@link #startPreprocessorAsync()}. + * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. *

* The futures returned by the methods within this class can be canceled, and will * propagate the cancellation to any subtasks. Thus it is also expected that any futures @@ -103,173 +102,90 @@ public abstract class OperationPartial implements Operation { @Setter(AccessLevel.PROTECTED) private String subRequestId; + @Getter + private final List propertyNames; + + /** + * Values for the properties identified by {@link #getPropertyNames()}. + */ + private final Map properties = new HashMap<>(); + /** * Constructs the object. * * @param params operation parameters * @param config configuration for this operation + * @param propertyNames names of properties required by this operation */ - public OperationPartial(ControlLoopOperationParams params, OperatorConfig config) { + protected OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List propertyNames) { this.params = params; this.config = config; this.fullName = params.getActor() + "." + params.getOperation(); + this.propertyNames = propertyNames; } public Executor getBlockingExecutor() { return config.getBlockingExecutor(); } + @Override public String getActorName() { return params.getActor(); } + @Override public String getName() { return params.getOperation(); } @Override - public CompletableFuture start() { - // allocate a controller for the entire operation - final PipelineControllerFuture controller = new PipelineControllerFuture<>(); - - CompletableFuture preproc = startPreprocessorAsync(); - if (preproc == null) { - // no preprocessor required - just start the operation - return startOperationAttempt(controller, 1); - } - - /* - * Do preprocessor first and then, if successful, start the operation. Note: - * operations create their own outcome, ignoring the outcome from any previous - * steps. - * - * Wrap the preprocessor to ensure "stop" is propagated to it. - */ - // @formatter:off - controller.wrap(preproc) - .exceptionally(fromException("preprocessor of operation")) - .thenCompose(handlePreprocessorFailure(controller)) - .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1)) - .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); - // @formatter:on - - return controller; + public boolean containsProperty(String name) { + return properties.containsKey(name); } - /** - * Handles a failure in the preprocessor pipeline. If a failure occurred, then it - * invokes the call-backs, marks the controller complete, and returns an incomplete - * future, effectively halting the pipeline. Otherwise, it returns the outcome that it - * received. - *

- * Assumes that no callbacks have been invoked yet. - * - * @param controller pipeline controller - * @return a function that checks the outcome status and continues, if successful, or - * indicates a failure otherwise - */ - private Function> handlePreprocessorFailure( - PipelineControllerFuture controller) { - - return outcome -> { - - if (isSuccess(outcome)) { - logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId()); - return CompletableFuture.completedFuture(outcome); - } - - logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId()); - - final Executor executor = params.getExecutor(); - final CallbackManager callbacks = new CallbackManager(); - - // propagate "stop" to the callbacks - controller.add(callbacks); - - final OperationOutcome outcome2 = params.makeOutcome(); - - // TODO need a FAILURE_MISSING_DATA (e.g., A&AI) - - outcome2.setFinalOutcome(true); - outcome2.setResult(PolicyResult.FAILURE_GUARD); - outcome2.setMessage(outcome != null ? outcome.getMessage() : null); - - // @formatter:off - CompletableFuture.completedFuture(outcome2) - .whenCompleteAsync(callbackStarted(callbacks), executor) - .whenCompleteAsync(callbackCompleted(callbacks), executor) - .whenCompleteAsync(controller.delayedComplete(), executor); - // @formatter:on - - return new CompletableFuture<>(); - }; + @Override + public void setProperty(String name, Object value) { + logger.info("{}: set property {}={}", getFullName(), name, value); + properties.put(name, value); } - /** - * Invokes the operation's preprocessor step(s) as a "future". This method simply - * returns {@code null}. - *

- * This method assumes the following: - *

- * - * @return a function that will start the preprocessor and returns its outcome, or - * {@code null} if this operation needs no preprocessor - */ - protected CompletableFuture startPreprocessorAsync() { - return null; + @SuppressWarnings("unchecked") + @Override + public T getProperty(String name) { + return (T) properties.get(name); } /** - * Invokes the operation's guard step(s) as a "future". - *

- * This method assumes the following: - *

+ * Gets a property value, throwing an exception if it's missing. * - * @return a function that will start the guard checks and returns its outcome, or - * {@code null} if this operation has no guard + * @param name property name + * @param propertyType property type, used in an error message if the property value + * is {@code null} + * @return the property value */ - protected CompletableFuture startGuardAsync() { - // get the guard payload - Map payload = makeGuardPayload(); + @SuppressWarnings("unchecked") + public T getRequiredProperty(String name, String propertyType) { + T value = (T) properties.get(name); + if (value == null) { + throw new IllegalStateException("missing " + propertyType); + } - /* - * Note: can't use constants from actor.guard, because that would create a - * circular dependency. - */ - return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null) - .payload(payload).build().start(); + return value; } - /** - * Creates a payload to execute a guard operation. - * - * @return a new guard payload - */ - protected Map makeGuardPayload() { - Map guard = new LinkedHashMap<>(); - guard.put("actor", params.getActor()); - guard.put("operation", params.getOperation()); - guard.put("target", params.getTargetEntity()); - guard.put("requestId", params.getRequestId()); - - String clname = params.getContext().getEvent().getClosedLoopControlName(); - if (clname != null) { - guard.put("clname", clname); - } + @Override + public CompletableFuture start() { + // allocate a controller for the entire operation + final PipelineControllerFuture controller = new PipelineControllerFuture<>(); - return guard; + // start attempt #1 + return startOperationAttempt(controller, 1); } /** - * Starts the operation attempt, with no preprocessor. When all retries complete, it - * will complete the controller. + * Starts the operation attempt. When all retries complete, it will complete the + * controller. * * @param controller controller for all operation attempts * @param attempt attempt number, typically starting with 1 @@ -289,6 +205,7 @@ public abstract class OperationPartial implements Operation { /** * Generates and sets {@link #subRequestId} to a new subrequest ID. + * * @param attempt attempt number, typically starting with 1 */ public void generateSubRequestId(int attempt) { @@ -300,7 +217,6 @@ public abstract class OperationPartial implements Operation { /** * Starts the operation attempt, without doing any retries. * - * @param params operation parameters * @param attempt attempt number, typically starting with 1 * @return a future that will return the result of a single operation attempt */ @@ -308,9 +224,9 @@ public abstract class OperationPartial implements Operation { logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId()); - final Executor executor = params.getExecutor(); - final OperationOutcome outcome = params.makeOutcome(); - final CallbackManager callbacks = new CallbackManager(); + final var executor = params.getExecutor(); + final var outcome = makeOutcome(); + final var callbacks = new CallbackManager(); // this operation attempt gets its own controller final PipelineControllerFuture controller = new PipelineControllerFuture<>(); @@ -357,7 +273,7 @@ public abstract class OperationPartial implements Operation { * @return {@code true} if the outcome was successful */ protected boolean isSuccess(OperationOutcome outcome) { - return (outcome != null && outcome.getResult() == PolicyResult.SUCCESS); + return (outcome != null && outcome.getResult() == OperationResult.SUCCESS); } /** @@ -368,7 +284,7 @@ public abstract class OperationPartial implements Operation { * and was associated with this operator, {@code false} otherwise */ protected boolean isActorFailed(OperationOutcome outcome) { - return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE); + return (isSameOperation(outcome) && outcome.getResult() == OperationResult.FAILURE); } /** @@ -440,7 +356,7 @@ public abstract class OperationPartial implements Operation { outcome = origOutcome; } else { logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId()); - outcome = this.setOutcome(params.makeOutcome(), PolicyResult.FAILURE); + outcome = this.setOutcome(makeOutcome(), OperationResult.FAILURE); } // ensure correct actor/operation @@ -448,7 +364,7 @@ public abstract class OperationPartial implements Operation { outcome.setOperation(getName()); // determine if we should retry, based on the result - if (outcome.getResult() != PolicyResult.FAILURE) { + if (outcome.getResult() != OperationResult.FAILURE) { // do not retry success or other failure types (e.g., exception) outcome.setFinalOutcome(true); return outcome; @@ -465,11 +381,11 @@ public abstract class OperationPartial implements Operation { } else { /* - * retries were specified and we've already tried them all - change to + * retries were specified, and we've already tried them all - change to * FAILURE_RETRIES */ logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId()); - outcome.setResult(PolicyResult.FAILURE_RETRIES); + outcome.setResult(OperationResult.FAILURE_RETRIES); outcome.setFinalOutcome(true); } @@ -539,7 +455,7 @@ public abstract class OperationPartial implements Operation { private Function fromException(String type) { return thrown -> { - OperationOutcome outcome = params.makeOutcome(); + OperationOutcome outcome = makeOutcome(); if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) { // do not include exception in the message, as it just clutters the log @@ -601,7 +517,7 @@ public abstract class OperationPartial implements Operation { return futures[0]; } - CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome) + CompletableFuture.anyOf(futures).thenApply(OperationOutcome.class::cast) .whenCompleteAsync(controller.delayedComplete(), params.getExecutor()); return controller; @@ -677,14 +593,13 @@ public abstract class OperationPartial implements Operation { * @return an array of futures, possibly zero-length. If the array is of size one, * then that one item should be returned instead of the controller */ + @SuppressWarnings("unchecked") private CompletableFuture[] attachFutures(PipelineControllerFuture controller, List>> futureMakers, UnaryOperator> adorn) { if (futureMakers.isEmpty()) { - @SuppressWarnings("unchecked") - CompletableFuture[] result = new CompletableFuture[0]; - return result; + return new CompletableFuture[0]; } // the last, unadorned future that is created @@ -714,8 +629,7 @@ public abstract class OperationPartial implements Operation { } } - @SuppressWarnings("unchecked") - CompletableFuture[] result = new CompletableFuture[futures.size()]; + var result = new CompletableFuture[futures.size()]; if (result.length == 1) { // special case - return the unadorned future @@ -830,7 +744,7 @@ public abstract class OperationPartial implements Operation { * executing */ final PipelineControllerFuture controller = new PipelineControllerFuture<>(); - final Executor executor = params.getExecutor(); + final var executor = params.getExecutor(); // @formatter:off controller.wrap(nextTask) @@ -913,7 +827,7 @@ public abstract class OperationPartial implements Operation { outcome.setEnd(null); // pass a copy to the callback - OperationOutcome outcome2 = new OperationOutcome(outcome); + var outcome2 = new OperationOutcome(outcome); outcome2.setFinalOutcome(false); params.callbackStarted(outcome2); } @@ -954,7 +868,8 @@ public abstract class OperationPartial implements Operation { * @return the updated operation */ public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) { - PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION); + OperationResult result = (isTimeout(thrown) ? OperationResult.FAILURE_TIMEOUT + : OperationResult.FAILURE_EXCEPTION); return setOutcome(operation, result); } @@ -965,15 +880,27 @@ public abstract class OperationPartial implements Operation { * @param result result of the operation * @return the updated operation */ - public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) { + public OperationOutcome setOutcome(OperationOutcome operation, OperationResult result) { logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId()); operation.setResult(result); - operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG + operation.setMessage(result == OperationResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG : ControlLoopOperation.FAILED_MSG); return operation; } + /** + * Makes an outcome, populating the "target" field with the contents of the target + * entity property. + * + * @return a new operation outcome + */ + protected OperationOutcome makeOutcome() { + OperationOutcome outcome = params.makeOutcome(); + outcome.setTarget(getProperty(OperationProperties.AAI_TARGET_ENTITY)); + return outcome; + } + /** * Determines if a throwable is due to a timeout. * @@ -1029,7 +956,7 @@ public abstract class OperationPartial implements Operation { return message.toString(); } else { try { - return makeCoder().encode(message, true); + return getCoder().encode(message, true); } catch (CoderException e) { throw new IllegalArgumentException("cannot encode message", e); } @@ -1071,7 +998,7 @@ public abstract class OperationPartial implements Operation { // these may be overridden by junit tests - protected Coder makeCoder() { + protected Coder getCoder() { return coder; } }