* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2020-2022 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.LinkedHashMap;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
+import lombok.AccessLevel;
import lombok.Getter;
+import lombok.Setter;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
import org.onap.policy.controlloop.actorserviceprovider.Operation;
import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
+import org.onap.policy.controlloop.actorserviceprovider.OperationResult;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
-import org.onap.policy.controlloop.policy.PolicyResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Partial implementation of an operator. In general, it's preferable that subclasses
* would override {@link #startOperationAsync(int, OperationOutcome)
* startOperationAsync()}. However, if that proves to be too difficult, then they can
- * simply override {@link #doOperation(int, OperationOutcome) doOperation()}. In addition,
- * if the operation requires any preprocessor steps, the subclass may choose to override
- * {@link #startPreprocessorAsync()}.
+ * simply override {@link #doOperation(int, OperationOutcome) doOperation()}.
* <p/>
* The futures returned by the methods within this class can be canceled, and will
* propagate the cancellation to any subtasks. Thus it is also expected that any futures
@Getter
private final String fullName;
+ @Getter
+ @Setter(AccessLevel.PROTECTED)
+ private String subRequestId;
+
+ @Getter
+ private final List<String> propertyNames;
+
+ /**
+ * Values for the properties identified by {@link #getPropertyNames()}.
+ */
+ private final Map<String, Object> properties = new HashMap<>();
+
/**
* Constructs the object.
*
* @param params operation parameters
* @param config configuration for this operation
+ * @param propertyNames names of properties required by this operation
*/
- public OperationPartial(ControlLoopOperationParams params, OperatorConfig config) {
+ protected OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
this.params = params;
this.config = config;
this.fullName = params.getActor() + "." + params.getOperation();
+ this.propertyNames = propertyNames;
}
public Executor getBlockingExecutor() {
return config.getBlockingExecutor();
}
+ @Override
public String getActorName() {
return params.getActor();
}
+ @Override
public String getName() {
return params.getOperation();
}
@Override
- public CompletableFuture<OperationOutcome> start() {
- // allocate a controller for the entire operation
- final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- CompletableFuture<OperationOutcome> preproc = startPreprocessorAsync();
- if (preproc == null) {
- // no preprocessor required - just start the operation
- return startOperationAttempt(controller, 1);
- }
-
- /*
- * Do preprocessor first and then, if successful, start the operation. Note:
- * operations create their own outcome, ignoring the outcome from any previous
- * steps.
- *
- * Wrap the preprocessor to ensure "stop" is propagated to it.
- */
- // @formatter:off
- controller.wrap(preproc)
- .exceptionally(fromException("preprocessor of operation"))
- .thenCompose(handlePreprocessorFailure(controller))
- .thenCompose(unusedOutcome -> startOperationAttempt(controller, 1))
- .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
- // @formatter:on
-
- return controller;
+ public boolean containsProperty(String name) {
+ return properties.containsKey(name);
}
- /**
- * Handles a failure in the preprocessor pipeline. If a failure occurred, then it
- * invokes the call-backs, marks the controller complete, and returns an incomplete
- * future, effectively halting the pipeline. Otherwise, it returns the outcome that it
- * received.
- * <p/>
- * Assumes that no callbacks have been invoked yet.
- *
- * @param controller pipeline controller
- * @return a function that checks the outcome status and continues, if successful, or
- * indicates a failure otherwise
- */
- private Function<OperationOutcome, CompletableFuture<OperationOutcome>> handlePreprocessorFailure(
- PipelineControllerFuture<OperationOutcome> controller) {
-
- return outcome -> {
-
- if (isSuccess(outcome)) {
- logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
- return CompletableFuture.completedFuture(outcome);
- }
-
- logger.warn("preprocessor failed, discontinuing operation {} for {}", getFullName(), params.getRequestId());
-
- final Executor executor = params.getExecutor();
- final CallbackManager callbacks = new CallbackManager();
-
- // propagate "stop" to the callbacks
- controller.add(callbacks);
-
- final OperationOutcome outcome2 = params.makeOutcome();
-
- // TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
-
- outcome2.setFinalOutcome(true);
- outcome2.setResult(PolicyResult.FAILURE_GUARD);
- outcome2.setMessage(outcome != null ? outcome.getMessage() : null);
-
- // @formatter:off
- CompletableFuture.completedFuture(outcome2)
- .whenCompleteAsync(callbackStarted(callbacks), executor)
- .whenCompleteAsync(callbackCompleted(callbacks), executor)
- .whenCompleteAsync(controller.delayedComplete(), executor);
- // @formatter:on
-
- return new CompletableFuture<>();
- };
+ @Override
+ public void setProperty(String name, Object value) {
+ logger.info("{}: set property {}={}", getFullName(), name, value);
+ properties.put(name, value);
}
- /**
- * Invokes the operation's preprocessor step(s) as a "future". This method simply
- * returns {@code null}.
- * <p/>
- * This method assumes the following:
- * <ul>
- * <li>the operator is alive</li>
- * <li>exceptions generated within the pipeline will be handled by the invoker</li>
- * </ul>
- *
- * @return a function that will start the preprocessor and returns its outcome, or
- * {@code null} if this operation needs no preprocessor
- */
- protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
- return null;
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T getProperty(String name) {
+ return (T) properties.get(name);
}
/**
- * Invokes the operation's guard step(s) as a "future".
- * <p/>
- * This method assumes the following:
- * <ul>
- * <li>the operator is alive</li>
- * <li>exceptions generated within the pipeline will be handled by the invoker</li>
- * </ul>
+ * Gets a property value, throwing an exception if it's missing.
*
- * @return a function that will start the guard checks and returns its outcome, or
- * {@code null} if this operation has no guard
+ * @param name property name
+ * @param propertyType property type, used in an error message if the property value
+ * is {@code null}
+ * @return the property value
*/
- protected CompletableFuture<OperationOutcome> startGuardAsync() {
- // get the guard payload
- Map<String, Object> guardPayload = makeGuardPayload();
-
- // wrap it in a "resource"
- Map<String, Object> resource = new LinkedHashMap<>();
- resource.put("guard", guardPayload);
-
- Map<String, Object> payload = new LinkedHashMap<>();
- payload.put("resource", resource);
+ @SuppressWarnings("unchecked")
+ public <T> T getRequiredProperty(String name, String propertyType) {
+ T value = (T) properties.get(name);
+ if (value == null) {
+ throw new IllegalStateException("missing " + propertyType);
+ }
- /*
- * Note: can't use constants from actor.guard, because that would create a
- * circular dependency.
- */
- return params.toBuilder().actor(GUARD_ACTOR_NAME).operation(GUARD_OPERATION_NAME).retry(null).timeoutSec(null)
- .payload(payload).build().start();
+ return value;
}
- /**
- * Creates a payload to execute a guard operation.
- *
- * @return a new guard payload
- */
- protected Map<String, Object> makeGuardPayload() {
- Map<String, Object> guard = new LinkedHashMap<>();
- guard.put("actor", params.getActor());
- guard.put("recipe", params.getOperation());
- guard.put("target", params.getTargetEntity());
- guard.put("requestId", params.getRequestId());
-
- String clname = params.getContext().getEvent().getClosedLoopControlName();
- if (clname != null) {
- guard.put("clname", clname);
- }
+ @Override
+ public CompletableFuture<OperationOutcome> start() {
+ // allocate a controller for the entire operation
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- return guard;
+ // start attempt #1
+ return startOperationAttempt(controller, 1);
}
/**
- * Starts the operation attempt, with no preprocessor. When all retries complete, it
- * will complete the controller.
+ * Starts the operation attempt. When all retries complete, it will complete the
+ * controller.
*
* @param controller controller for all operation attempts
* @param attempt attempt number, typically starting with 1
private CompletableFuture<OperationOutcome> startOperationAttempt(
PipelineControllerFuture<OperationOutcome> controller, int attempt) {
+ generateSubRequestId(attempt);
+
// propagate "stop" to the operation attempt
controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
.whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
return controller;
}
+ /**
+ * Generates and sets {@link #subRequestId} to a new subrequest ID.
+ *
+ * @param attempt attempt number, typically starting with 1
+ */
+ public void generateSubRequestId(int attempt) {
+ // Note: this should be "protected", but that makes junits much messier
+
+ setSubRequestId(UUID.randomUUID().toString());
+ }
+
/**
* Starts the operation attempt, without doing any retries.
*
- * @param params operation parameters
* @param attempt attempt number, typically starting with 1
* @return a future that will return the result of a single operation attempt
*/
logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
- final Executor executor = params.getExecutor();
- final OperationOutcome outcome = params.makeOutcome();
- final CallbackManager callbacks = new CallbackManager();
+ final var executor = params.getExecutor();
+ final var outcome = makeOutcome();
+ final var callbacks = new CallbackManager();
// this operation attempt gets its own controller
final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
* @return {@code true} if the outcome was successful
*/
protected boolean isSuccess(OperationOutcome outcome) {
- return (outcome != null && outcome.getResult() == PolicyResult.SUCCESS);
+ return (outcome != null && outcome.getResult() == OperationResult.SUCCESS);
}
/**
* <i>and</i> was associated with this operator, {@code false} otherwise
*/
protected boolean isActorFailed(OperationOutcome outcome) {
- return (isSameOperation(outcome) && outcome.getResult() == PolicyResult.FAILURE);
+ return (isSameOperation(outcome) && outcome.getResult() == OperationResult.FAILURE);
}
/**
outcome = origOutcome;
} else {
logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
- outcome = this.setOutcome(params.makeOutcome(), PolicyResult.FAILURE);
+ outcome = this.setOutcome(makeOutcome(), OperationResult.FAILURE);
}
// ensure correct actor/operation
outcome.setOperation(getName());
// determine if we should retry, based on the result
- if (outcome.getResult() != PolicyResult.FAILURE) {
+ if (outcome.getResult() != OperationResult.FAILURE) {
// do not retry success or other failure types (e.g., exception)
outcome.setFinalOutcome(true);
return outcome;
} else {
/*
- * retries were specified and we've already tried them all - change to
+ * retries were specified, and we've already tried them all - change to
* FAILURE_RETRIES
*/
logger.info("operation {} retries exhausted for {}", getFullName(), params.getRequestId());
- outcome.setResult(PolicyResult.FAILURE_RETRIES);
+ outcome.setResult(OperationResult.FAILURE_RETRIES);
outcome.setFinalOutcome(true);
}
private Function<Throwable, OperationOutcome> fromException(String type) {
return thrown -> {
- OperationOutcome outcome = params.makeOutcome();
+ OperationOutcome outcome = makeOutcome();
if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
// do not include exception in the message, as it just clutters the log
return futures[0];
}
- CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
+ CompletableFuture.anyOf(futures).thenApply(OperationOutcome.class::cast)
.whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
return controller;
* @return an array of futures, possibly zero-length. If the array is of size one,
* then that one item should be returned instead of the controller
*/
+ @SuppressWarnings("unchecked")
private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
if (futureMakers.isEmpty()) {
- @SuppressWarnings("unchecked")
- CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
- return result;
+ return new CompletableFuture[0];
}
// the last, unadorned future that is created
}
}
- @SuppressWarnings("unchecked")
- CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
+ var result = new CompletableFuture[futures.size()];
if (result.length == 1) {
// special case - return the unadorned future
* executing
*/
final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- final Executor executor = params.getExecutor();
+ final var executor = params.getExecutor();
// @formatter:off
controller.wrap(nextTask)
* @param callbacks used to determine if the start callback can be invoked
* @return a function that sets the start time and invokes the callback
*/
- private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
+ protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
return (outcome, thrown) -> {
if (callbacks.canStart()) {
+ outcome.setSubRequestId(getSubRequestId());
outcome.setStart(callbacks.getStartTime());
outcome.setEnd(null);
// pass a copy to the callback
- OperationOutcome outcome2 = new OperationOutcome(outcome);
+ var outcome2 = new OperationOutcome(outcome);
outcome2.setFinalOutcome(false);
params.callbackStarted(outcome2);
}
* @param callbacks used to determine if the end callback can be invoked
* @return a function that sets the end time and invokes the callback
*/
- private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
+ protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
return (outcome, thrown) -> {
if (callbacks.canEnd()) {
+ outcome.setSubRequestId(getSubRequestId());
outcome.setStart(callbacks.getStartTime());
outcome.setEnd(callbacks.getEndTime());
* @return the updated operation
*/
public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
- PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
+ OperationResult result = (isTimeout(thrown) ? OperationResult.FAILURE_TIMEOUT
+ : OperationResult.FAILURE_EXCEPTION);
return setOutcome(operation, result);
}
* @param result result of the operation
* @return the updated operation
*/
- public OperationOutcome setOutcome(OperationOutcome operation, PolicyResult result) {
+ public OperationOutcome setOutcome(OperationOutcome operation, OperationResult result) {
logger.trace("{}: set outcome {} for {}", getFullName(), result, params.getRequestId());
operation.setResult(result);
- operation.setMessage(result == PolicyResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
+ operation.setMessage(result == OperationResult.SUCCESS ? ControlLoopOperation.SUCCESS_MSG
: ControlLoopOperation.FAILED_MSG);
return operation;
}
+ /**
+ * Makes an outcome, populating the "target" field with the contents of the target
+ * entity property.
+ *
+ * @return a new operation outcome
+ */
+ protected OperationOutcome makeOutcome() {
+ OperationOutcome outcome = params.makeOutcome();
+ outcome.setTarget(getProperty(OperationProperties.AAI_TARGET_ENTITY));
+ return outcome;
+ }
+
/**
* Determines if a throwable is due to a timeout.
*
}
/**
- * Logs a response. If the response is not of type, String, then it attempts to
+ * Logs a message. If the message is not of type, String, then it attempts to
* pretty-print it into JSON before logging.
*
* @param direction IN or OUT
* @param infra communication infrastructure on which it was published
* @param source source name (e.g., the URL or Topic name)
- * @param response response to be logged
+ * @param message message to be logged
* @return the JSON text that was logged
*/
- public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) {
+ public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
String json;
try {
- if (response == null) {
- json = null;
- } else if (response instanceof String) {
- json = response.toString();
- } else {
- json = makeCoder().encode(response, true);
- }
+ json = prettyPrint(message);
- } catch (CoderException e) {
+ } catch (IllegalArgumentException e) {
String type = (direction == EventType.IN ? "response" : "request");
logger.warn("cannot pretty-print {}", type, e);
- json = response.toString();
+ json = message.toString();
}
logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
return json;
}
+ /**
+ * Converts a message to a "pretty-printed" String using the operation's normal
+ * serialization provider (i.e., it's <i>coder</i>).
+ *
+ * @param message response to be logged
+ * @return the JSON text that was logged
+ * @throws IllegalArgumentException if the message cannot be converted
+ */
+ public <T> String prettyPrint(T message) {
+ if (message == null) {
+ return null;
+ } else if (message instanceof String) {
+ return message.toString();
+ } else {
+ try {
+ return getCoder().encode(message, true);
+ } catch (CoderException e) {
+ throw new IllegalArgumentException("cannot encode message", e);
+ }
+ }
+ }
+
// these may be overridden by subclasses or junit tests
/**
// these may be overridden by junit tests
- protected Coder makeCoder() {
+ protected Coder getCoder() {
return coder;
}
}