import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
+import lombok.AccessLevel;
import lombok.Getter;
+import lombok.Setter;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
import org.onap.policy.controlloop.actorserviceprovider.Operation;
import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.OperationProperties;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
* returned by overridden methods will do the same. Of course, if a class overrides
* {@link #doOperation(int, OperationOutcome) doOperation()}, then there's little that can
* be done to cancel that particular operation.
+ * <p/>
+ * In general tasks in a pipeline are executed by the same thread. However, the following
+ * should always be executed via the executor specified in "params":
+ * <ul>
+ * <li>start callback</li>
+ * <li>completion callback</li>
+ * <li>controller completion (i.e., delayedComplete())</li>
+ * </ul>
*/
public abstract class OperationPartial implements Operation {
private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
@Getter
private final String fullName;
+ @Getter
+ @Setter(AccessLevel.PROTECTED)
+ private String subRequestId;
+
+ @Getter
+ private final List<String> propertyNames;
+
+ /**
+ * Values for the properties identified by {@link #getPropertyNames()}.
+ */
+ private final Map<String, Object> properties = new HashMap<>();
+
/**
* Constructs the object.
*
* @param params operation parameters
* @param config configuration for this operation
+ * @param propertyNames names of properties required by this operation
*/
- public OperationPartial(ControlLoopOperationParams params, OperatorConfig config) {
+ public OperationPartial(ControlLoopOperationParams params, OperatorConfig config, List<String> propertyNames) {
this.params = params;
this.config = config;
this.fullName = params.getActor() + "." + params.getOperation();
+ this.propertyNames = propertyNames;
}
public Executor getBlockingExecutor() {
}
@Override
- public final CompletableFuture<OperationOutcome> start() {
+ public boolean containsProperty(String name) {
+ return properties.containsKey(name);
+ }
+
+ @Override
+ public void setProperty(String name, Object value) {
+ logger.info("{}: set property {}={}", getFullName(), name, value);
+ properties.put(name, value);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T getProperty(String name) {
+ return (T) properties.get(name);
+ }
+
+ @Override
+ public CompletableFuture<OperationOutcome> start() {
// allocate a controller for the entire operation
final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
return outcome -> {
- if (outcome != null && isSuccess(outcome)) {
+ if (isSuccess(outcome)) {
logger.info("{}: preprocessor succeeded for {}", getFullName(), params.getRequestId());
return CompletableFuture.completedFuture(outcome);
}
// propagate "stop" to the callbacks
controller.add(callbacks);
- final OperationOutcome outcome2 = params.makeOutcome();
+ final OperationOutcome outcome2 = params.makeOutcome(getTargetEntity());
// TODO need a FAILURE_MISSING_DATA (e.g., A&AI)
* {@code null} if this operation has no guard
*/
protected CompletableFuture<OperationOutcome> startGuardAsync() {
- // get the guard payload
- Map<String, Object> guardPayload = makeGuardPayload();
-
- // wrap it in a "resource"
- Map<String, Object> resource = new LinkedHashMap<>();
- resource.put("guard", guardPayload);
+ if (params.isPreprocessed()) {
+ return null;
+ }
- Map<String, Object> payload = new LinkedHashMap<>();
- payload.put("resource", resource);
+ // get the guard payload
+ Map<String, Object> payload = makeGuardPayload();
/*
* Note: can't use constants from actor.guard, because that would create a
* @return a new guard payload
*/
protected Map<String, Object> makeGuardPayload() {
+ // TODO delete this once preprocessing is done by the application
Map<String, Object> guard = new LinkedHashMap<>();
guard.put("actor", params.getActor());
- guard.put("recipe", params.getOperation());
- guard.put("target", params.getTargetEntity());
+ guard.put("operation", params.getOperation());
+ guard.put("target", getTargetEntity());
guard.put("requestId", params.getRequestId());
String clname = params.getContext().getEvent().getClosedLoopControlName();
private CompletableFuture<OperationOutcome> startOperationAttempt(
PipelineControllerFuture<OperationOutcome> controller, int attempt) {
+ generateSubRequestId(attempt);
+
// propagate "stop" to the operation attempt
controller.wrap(startAttemptWithoutRetries(attempt)).thenCompose(retryOnFailure(controller, attempt))
.whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
return controller;
}
+ /**
+ * Generates and sets {@link #subRequestId} to a new subrequest ID.
+ *
+ * @param attempt attempt number, typically starting with 1
+ */
+ public void generateSubRequestId(int attempt) {
+ // Note: this should be "protected", but that makes junits much messier
+
+ setSubRequestId(UUID.randomUUID().toString());
+ }
+
/**
* Starts the operation attempt, without doing any retries.
*
logger.info("{}: start operation attempt {} for {}", getFullName(), attempt, params.getRequestId());
final Executor executor = params.getExecutor();
- final OperationOutcome outcome = params.makeOutcome();
+ final OperationOutcome outcome = params.makeOutcome(getTargetEntity());
final CallbackManager callbacks = new CallbackManager();
// this operation attempt gets its own controller
* @return {@code true} if the outcome was successful
*/
protected boolean isSuccess(OperationOutcome outcome) {
- return (outcome.getResult() == PolicyResult.SUCCESS);
+ return (outcome != null && outcome.getResult() == PolicyResult.SUCCESS);
}
/**
outcome = origOutcome;
} else {
logger.warn("{}: null outcome; treating as a failure for {}", getFullName(), params.getRequestId());
- outcome = this.setOutcome(params.makeOutcome(), PolicyResult.FAILURE);
+ outcome = this.setOutcome(params.makeOutcome(getTargetEntity()), PolicyResult.FAILURE);
}
// ensure correct actor/operation
private Function<Throwable, OperationOutcome> fromException(String type) {
return thrown -> {
- OperationOutcome outcome = params.makeOutcome();
+ OperationOutcome outcome = params.makeOutcome(getTargetEntity());
- logger.warn("exception throw by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
- params.getRequestId(), thrown);
+ if (thrown instanceof CancellationException || thrown.getCause() instanceof CancellationException) {
+ // do not include exception in the message, as it just clutters the log
+ logger.warn("{} canceled {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
+ params.getRequestId());
+ } else {
+ logger.warn("exception thrown by {} {}.{} for {}", type, outcome.getActor(), outcome.getOperation(),
+ params.getRequestId(), thrown);
+ }
return setOutcome(outcome, thrown);
};
* created. If this future is canceled, then all of the futures will be
* canceled
*/
- protected CompletableFuture<OperationOutcome> anyOf(
+ public CompletableFuture<OperationOutcome> anyOf(
@SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
return anyOf(Arrays.asList(futureMakers));
* canceled. Similarly, when this future completes, any incomplete futures
* will be canceled
*/
- protected CompletableFuture<OperationOutcome> anyOf(
- List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
+ public CompletableFuture<OperationOutcome> anyOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
* created. If this future is canceled, then all of the futures will be
* canceled
*/
- protected CompletableFuture<OperationOutcome> allOf(
+ public CompletableFuture<OperationOutcome> allOf(
@SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
return allOf(Arrays.asList(futureMakers));
* canceled. Similarly, when this future completes, any incomplete futures
* will be canceled
*/
- protected CompletableFuture<OperationOutcome> allOf(
- List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
+ public CompletableFuture<OperationOutcome> allOf(List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
Queue<OperationOutcome> outcomes = new LinkedList<>();
* @param futureMakers functions to make the futures
* @return a future to cancel the sequence or await the outcome
*/
- protected CompletableFuture<OperationOutcome> sequence(
+ public CompletableFuture<OperationOutcome> sequence(
@SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
return sequence(Arrays.asList(futureMakers));
* @return a future to cancel the sequence or await the outcome, or {@code null} if
* there were no tasks to perform
*/
- protected CompletableFuture<OperationOutcome> sequence(
+ public CompletableFuture<OperationOutcome> sequence(
List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
// @formatter:off
controller.wrap(nextTask)
- .thenComposeAsync(nextTaskOnSuccess(controller, queue), executor)
+ .thenCompose(nextTaskOnSuccess(controller, queue))
.whenCompleteAsync(controller.delayedComplete(), executor);
// @formatter:on
// @formatter:off
return controller
.wrap(nextTask)
- .thenComposeAsync(nextTaskOnSuccess(controller, taskQueue), params.getExecutor());
+ .thenCompose(nextTaskOnSuccess(controller, taskQueue));
// @formatter:on
};
}
* @param callbacks used to determine if the start callback can be invoked
* @return a function that sets the start time and invokes the callback
*/
- private BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
+ protected BiConsumer<OperationOutcome, Throwable> callbackStarted(CallbackManager callbacks) {
return (outcome, thrown) -> {
if (callbacks.canStart()) {
+ outcome.setSubRequestId(getSubRequestId());
outcome.setStart(callbacks.getStartTime());
outcome.setEnd(null);
* @param callbacks used to determine if the end callback can be invoked
* @return a function that sets the end time and invokes the callback
*/
- private BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
+ protected BiConsumer<OperationOutcome, Throwable> callbackCompleted(CallbackManager callbacks) {
return (outcome, thrown) -> {
if (callbacks.canEnd()) {
+ outcome.setSubRequestId(getSubRequestId());
outcome.setStart(callbacks.getStartTime());
outcome.setEnd(callbacks.getEndTime());
* @param operation operation to be updated
* @return the updated operation
*/
- protected OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
+ public OperationOutcome setOutcome(OperationOutcome operation, Throwable thrown) {
PolicyResult result = (isTimeout(thrown) ? PolicyResult.FAILURE_TIMEOUT : PolicyResult.FAILURE_EXCEPTION);
return setOutcome(operation, result);
}
}
/**
- * Logs a response. If the response is not of type, String, then it attempts to
+ * Logs a message. If the message is not of type, String, then it attempts to
* pretty-print it into JSON before logging.
*
* @param direction IN or OUT
* @param infra communication infrastructure on which it was published
* @param source source name (e.g., the URL or Topic name)
- * @param response response to be logged
+ * @param message message to be logged
* @return the JSON text that was logged
*/
- public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) {
+ public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T message) {
String json;
try {
- if (response == null) {
- json = null;
- } else if (response instanceof String) {
- json = response.toString();
- } else {
- json = makeCoder().encode(response, true);
- }
+ json = prettyPrint(message);
- } catch (CoderException e) {
+ } catch (IllegalArgumentException e) {
String type = (direction == EventType.IN ? "response" : "request");
logger.warn("cannot pretty-print {}", type, e);
- json = response.toString();
+ json = message.toString();
}
logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
return json;
}
+ /**
+ * Converts a message to a "pretty-printed" String using the operation's normal
+ * serialization provider (i.e., it's <i>coder</i>).
+ *
+ * @param message response to be logged
+ * @return the JSON text that was logged
+ * @throws IllegalArgumentException if the message cannot be converted
+ */
+ public <T> String prettyPrint(T message) {
+ if (message == null) {
+ return null;
+ } else if (message instanceof String) {
+ return message.toString();
+ } else {
+ try {
+ return getCoder().encode(message, true);
+ } catch (CoderException e) {
+ throw new IllegalArgumentException("cannot encode message", e);
+ }
+ }
+ }
+
// these may be overridden by subclasses or junit tests
/**
return DEFAULT_RETRY_WAIT_MS;
}
+ /**
+ * Gets the target entity, first trying the properties and then the parameters.
+ *
+ * @return the target entity
+ */
+ protected String getTargetEntity() {
+ String targetEntity = getProperty(OperationProperties.AAI_TARGET_ENTITY);
+ return (targetEntity != null ? targetEntity : params.getTargetEntity());
+ }
+
/**
* Gets the operation timeout.
*
// these may be overridden by junit tests
- protected Coder makeCoder() {
+ protected Coder getCoder() {
return coder;
}
}