package org.onap.policy.controlloop.actorserviceprovider.impl;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil;
+import org.onap.policy.common.endpoints.utils.NetLoggerUtil.EventType;
+import org.onap.policy.common.utils.coder.Coder;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
import org.onap.policy.controlloop.ControlLoopOperation;
import org.onap.policy.controlloop.actorserviceprovider.CallbackManager;
import org.onap.policy.controlloop.actorserviceprovider.Operation;
* be done to cancel that particular operation.
*/
public abstract class OperationPartial implements Operation {
-
private static final Logger logger = LoggerFactory.getLogger(OperationPartial.class);
+ private static final Coder coder = new StandardCoder();
public static final long DEFAULT_RETRY_WAIT_MS = 1000L;
// values extracted from the operator
* Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
* any outstanding futures when one completes.
*
- * @param futures futures for which to wait
- * @return a future to cancel or await an outcome. If this future is canceled, then
- * all of the futures will be canceled
+ * @param futureMakers function to make a future. If the function returns
+ * {@code null}, then no future is created for that function. On the other
+ * hand, if the function throws an exception, then the previously created
+ * functions are canceled and the exception is re-thrown
+ * @return a future to cancel or await an outcome, or {@code null} if no futures were
+ * created. If this future is canceled, then all of the futures will be
+ * canceled
*/
- protected CompletableFuture<OperationOutcome> anyOf(List<CompletableFuture<OperationOutcome>> futures) {
-
- // convert list to an array
- @SuppressWarnings("rawtypes")
- CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+ protected CompletableFuture<OperationOutcome> anyOf(
+ @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
- @SuppressWarnings("unchecked")
- CompletableFuture<OperationOutcome> result = anyOf(arrFutures);
- return result;
+ return anyOf(Arrays.asList(futureMakers));
}
/**
- * Same as {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels any
- * outstanding futures when one completes.
+ * Similar to {@link CompletableFuture#anyOf(CompletableFuture...)}, but it cancels
+ * any outstanding futures when one completes.
*
- * @param futures futures for which to wait
- * @return a future to cancel or await an outcome. If this future is canceled, then
- * all of the futures will be canceled
+ * @param futureMakers function to make a future. If the function returns
+ * {@code null}, then no future is created for that function. On the other
+ * hand, if the function throws an exception, then the previously created
+ * functions are canceled and the exception is re-thrown
+ * @return a future to cancel or await an outcome, or {@code null} if no futures were
+ * created. If this future is canceled, then all of the futures will be
+ * canceled. Similarly, when this future completes, any incomplete futures
+ * will be canceled
*/
protected CompletableFuture<OperationOutcome> anyOf(
- @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+ List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
+
+ PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+ CompletableFuture<OperationOutcome>[] futures =
+ attachFutures(controller, futureMakers, UnaryOperator.identity());
+
+ if (futures.length == 0) {
+ // no futures were started
+ return null;
+ }
if (futures.length == 1) {
return futures[0];
}
- final Executor executor = params.getExecutor();
- final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- attachFutures(controller, futures);
-
- // @formatter:off
- CompletableFuture.anyOf(futures)
- .thenApply(object -> (OperationOutcome) object)
- .whenCompleteAsync(controller.delayedComplete(), executor);
- // @formatter:on
+ CompletableFuture.anyOf(futures).thenApply(outcome -> (OperationOutcome) outcome)
+ .whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
return controller;
}
/**
- * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels
- * the futures if returned future is canceled. The future returns the "worst" outcome,
- * based on priority (see {@link #detmPriority(OperationOutcome)}).
+ * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
*
- * @param futures futures for which to wait
- * @return a future to cancel or await an outcome. If this future is canceled, then
- * all of the futures will be canceled
+ * @param futureMakers function to make a future. If the function returns
+ * {@code null}, then no future is created for that function. On the other
+ * hand, if the function throws an exception, then the previously created
+ * functions are canceled and the exception is re-thrown
+ * @return a future to cancel or await an outcome, or {@code null} if no futures were
+ * created. If this future is canceled, then all of the futures will be
+ * canceled
*/
- protected CompletableFuture<OperationOutcome> allOf(List<CompletableFuture<OperationOutcome>> futures) {
-
- // convert list to an array
- @SuppressWarnings("rawtypes")
- CompletableFuture[] arrFutures = futures.toArray(new CompletableFuture[futures.size()]);
+ protected CompletableFuture<OperationOutcome> allOf(
+ @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
- @SuppressWarnings("unchecked")
- CompletableFuture<OperationOutcome> result = allOf(arrFutures);
- return result;
+ return allOf(Arrays.asList(futureMakers));
}
/**
- * Same as {@link CompletableFuture#allOf(CompletableFuture...)}, but it cancels the
- * futures if returned future is canceled. The future returns the "worst" outcome,
- * based on priority (see {@link #detmPriority(OperationOutcome)}).
+ * Similar to {@link CompletableFuture#allOf(CompletableFuture...)}.
*
- * @param futures futures for which to wait
- * @return a future to cancel or await an outcome. If this future is canceled, then
- * all of the futures will be canceled
+ * @param futureMakers function to make a future. If the function returns
+ * {@code null}, then no future is created for that function. On the other
+ * hand, if the function throws an exception, then the previously created
+ * functions are canceled and the exception is re-thrown
+ * @return a future to cancel or await an outcome, or {@code null} if no futures were
+ * created. If this future is canceled, then all of the futures will be
+ * canceled. Similarly, when this future completes, any incomplete futures
+ * will be canceled
*/
protected CompletableFuture<OperationOutcome> allOf(
- @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
-
- if (futures.length == 1) {
- return futures[0];
- }
+ List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
+ PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
- final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
-
- attachFutures(controller, futures);
+ Queue<OperationOutcome> outcomes = new LinkedList<>();
- OperationOutcome[] outcomes = new OperationOutcome[futures.length];
+ CompletableFuture<OperationOutcome>[] futures =
+ attachFutures(controller, futureMakers, future -> future.thenApply(outcome -> {
+ synchronized (outcomes) {
+ outcomes.add(outcome);
+ }
+ return outcome;
+ }));
- @SuppressWarnings("rawtypes")
- CompletableFuture[] futures2 = new CompletableFuture[futures.length];
+ if (futures.length == 0) {
+ // no futures were started
+ return null;
+ }
- // record the outcomes of each future when it completes
- for (int count = 0; count < futures2.length; ++count) {
- final int count2 = count;
- futures2[count] = futures[count].whenComplete((outcome2, thrown) -> outcomes[count2] = outcome2);
+ if (futures.length == 1) {
+ return futures[0];
}
// @formatter:off
- CompletableFuture.allOf(futures2)
+ CompletableFuture.allOf(futures)
.thenApply(unused -> combineOutcomes(outcomes))
.whenCompleteAsync(controller.delayedComplete(), params.getExecutor());
// @formatter:on
}
/**
- * Attaches the given futures to the controller.
+ * Invokes the functions to create the futures and attaches them to the controller.
*
* @param controller master controller for all of the futures
- * @param futures futures to be attached to the controller
- */
- private void attachFutures(PipelineControllerFuture<OperationOutcome> controller,
- @SuppressWarnings("unchecked") CompletableFuture<OperationOutcome>... futures) {
+ * @param futureMakers futures to be attached to the controller
+ * @param adorn function that "adorns" the future, possible adding onto its pipeline.
+ * Returns the adorned future
+ * @return an array of futures, possibly zero-length. If the array is of size one,
+ * then that one item should be returned instead of the controller
+ */
+ private CompletableFuture<OperationOutcome>[] attachFutures(PipelineControllerFuture<OperationOutcome> controller,
+ List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers,
+ UnaryOperator<CompletableFuture<OperationOutcome>> adorn) {
+
+ if (futureMakers.isEmpty()) {
+ @SuppressWarnings("unchecked")
+ CompletableFuture<OperationOutcome>[] result = new CompletableFuture[0];
+ return result;
+ }
- if (futures.length == 0) {
- throw new IllegalArgumentException("empty list of futures");
+ // the last, unadorned future that is created
+ CompletableFuture<OperationOutcome> lastFuture = null;
+
+ List<CompletableFuture<OperationOutcome>> futures = new ArrayList<>(futureMakers.size());
+
+ // make each future
+ for (var maker : futureMakers) {
+ try {
+ CompletableFuture<OperationOutcome> future = maker.get();
+ if (future == null) {
+ continue;
+ }
+
+ // propagate "stop" to the future
+ controller.add(future);
+
+ futures.add(adorn.apply(future));
+
+ lastFuture = future;
+
+ } catch (RuntimeException e) {
+ logger.warn("{}: exception creating 'future' for {}", getFullName(), params.getRequestId());
+ controller.cancel(false);
+ throw e;
+ }
}
- // attach each task
- for (CompletableFuture<OperationOutcome> future : futures) {
- controller.add(future);
+ @SuppressWarnings("unchecked")
+ CompletableFuture<OperationOutcome>[] result = new CompletableFuture[futures.size()];
+
+ if (result.length == 1) {
+ // special case - return the unadorned future
+ result[0] = lastFuture;
+ return result;
}
+
+ return futures.toArray(result);
}
/**
* @param outcomes outcomes to be examined
* @return the combined outcome
*/
- private OperationOutcome combineOutcomes(OperationOutcome[] outcomes) {
+ private OperationOutcome combineOutcomes(Queue<OperationOutcome> outcomes) {
// identify the outcome with the highest priority
- OperationOutcome outcome = outcomes[0];
+ OperationOutcome outcome = outcomes.remove();
int priority = detmPriority(outcome);
- // start with "1", as we've already dealt with "0"
- for (int count = 1; count < outcomes.length; ++count) {
- OperationOutcome outcome2 = outcomes[count];
+ for (OperationOutcome outcome2 : outcomes) {
int priority2 = detmPriority(outcome2);
if (priority2 > priority) {
}
/**
- * Performs a task, after verifying that the controller is still running. Also checks
- * that the previous outcome was successful, if specified.
+ * Performs a sequence of tasks, stopping if a task fails. A given task's future is
+ * not created until the previous task completes. The pipeline returns the outcome of
+ * the last task executed.
*
- * @param controller overall pipeline controller
- * @param checkSuccess {@code true} to check the previous outcome, {@code false}
- * otherwise
- * @param outcome outcome of the previous task
- * @param task task to be performed
- * @return the task, if everything checks out. Otherwise, it returns an incomplete
- * future and completes the controller instead
+ * @param futureMakers functions to make the futures
+ * @return a future to cancel the sequence or await the outcome
*/
- // @formatter:off
- protected CompletableFuture<OperationOutcome> doTask(
- PipelineControllerFuture<OperationOutcome> controller,
- boolean checkSuccess, OperationOutcome outcome,
- CompletableFuture<OperationOutcome> task) {
- // @formatter:on
+ protected CompletableFuture<OperationOutcome> sequence(
+ @SuppressWarnings("unchecked") Supplier<CompletableFuture<OperationOutcome>>... futureMakers) {
- if (checkSuccess && !isSuccess(outcome)) {
- /*
- * must complete before canceling so that cancel() doesn't cause controller to
- * complete
- */
- controller.complete(outcome);
- task.cancel(false);
- return new CompletableFuture<>();
+ return sequence(Arrays.asList(futureMakers));
+ }
+
+ /**
+ * Performs a sequence of tasks, stopping if a task fails. A given task's future is
+ * not created until the previous task completes. The pipeline returns the outcome of
+ * the last task executed.
+ *
+ * @param futureMakers functions to make the futures
+ * @return a future to cancel the sequence or await the outcome, or {@code null} if
+ * there were no tasks to perform
+ */
+ protected CompletableFuture<OperationOutcome> sequence(
+ List<Supplier<CompletableFuture<OperationOutcome>>> futureMakers) {
+
+ Queue<Supplier<CompletableFuture<OperationOutcome>>> queue = new ArrayDeque<>(futureMakers);
+
+ CompletableFuture<OperationOutcome> nextTask = getNextTask(queue);
+ if (nextTask == null) {
+ // no tasks
+ return null;
+ }
+
+ if (queue.isEmpty()) {
+ // only one task - just return it rather than wrapping it in a controller
+ return nextTask;
}
- return controller.wrap(task);
+ /*
+ * multiple tasks - need a controller to stop whichever task is currently
+ * executing
+ */
+ final PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+ final Executor executor = params.getExecutor();
+
+ // @formatter:off
+ controller.wrap(nextTask)
+ .thenComposeAsync(nextTaskOnSuccess(controller, queue), executor)
+ .whenCompleteAsync(controller.delayedComplete(), executor);
+ // @formatter:on
+
+ return controller;
}
/**
- * Performs a task, after verifying that the controller is still running. Also checks
- * that the previous outcome was successful, if specified.
+ * Executes the next task in the queue, if the previous outcome was successful.
*
- * @param controller overall pipeline controller
- * @param checkSuccess {@code true} to check the previous outcome, {@code false}
- * otherwise
- * @param task function to start the task to be performed
- * @return a function to perform the task. If everything checks out, then it returns
- * the task. Otherwise, it returns an incomplete future and completes the
- * controller instead
+ * @param controller pipeline controller
+ * @param taskQueue queue of tasks to be performed
+ * @return a future to execute the remaining tasks, or the current outcome, if it's a
+ * failure, or if there are no more tasks
*/
- // @formatter:off
- protected Function<OperationOutcome, CompletableFuture<OperationOutcome>> doTask(
+ private Function<OperationOutcome, CompletableFuture<OperationOutcome>> nextTaskOnSuccess(
PipelineControllerFuture<OperationOutcome> controller,
- boolean checkSuccess,
- Function<OperationOutcome, CompletableFuture<OperationOutcome>> task) {
- // @formatter:on
+ Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
return outcome -> {
-
- if (!controller.isRunning()) {
- return new CompletableFuture<>();
+ if (!isSuccess(outcome)) {
+ // return the failure
+ return CompletableFuture.completedFuture(outcome);
}
- if (checkSuccess && !isSuccess(outcome)) {
- controller.complete(outcome);
- return new CompletableFuture<>();
+ CompletableFuture<OperationOutcome> nextTask = getNextTask(taskQueue);
+ if (nextTask == null) {
+ // no tasks - just return the success
+ return CompletableFuture.completedFuture(outcome);
}
- return controller.wrap(task.apply(outcome));
+ // @formatter:off
+ return controller
+ .wrap(nextTask)
+ .thenComposeAsync(nextTaskOnSuccess(controller, taskQueue), params.getExecutor());
+ // @formatter:on
};
}
+ /**
+ * Gets the next task from the queue, skipping those that are {@code null}.
+ *
+ * @param taskQueue task queue
+ * @return the next task, or {@code null} if the queue is now empty
+ */
+ private CompletableFuture<OperationOutcome> getNextTask(
+ Queue<Supplier<CompletableFuture<OperationOutcome>>> taskQueue) {
+
+ Supplier<CompletableFuture<OperationOutcome>> maker;
+
+ while ((maker = taskQueue.poll()) != null) {
+ CompletableFuture<OperationOutcome> future = maker.get();
+ if (future != null) {
+ return future;
+ }
+ }
+
+ return null;
+ }
+
/**
* Sets the start time of the operation and invokes the callback to indicate that the
* operation has started. Does nothing if the pipeline has been stopped.
return (thrown instanceof TimeoutException);
}
+ /**
+ * Logs a response. If the response is not of type, String, then it attempts to
+ * pretty-print it into JSON before logging.
+ *
+ * @param direction IN or OUT
+ * @param infra communication infrastructure on which it was published
+ * @param source source name (e.g., the URL or Topic name)
+ * @param response response to be logged
+ * @return the JSON text that was logged
+ */
+ public <T> String logMessage(EventType direction, CommInfrastructure infra, String source, T response) {
+ String json;
+ try {
+ if (response == null) {
+ json = null;
+ } else if (response instanceof String) {
+ json = response.toString();
+ } else {
+ json = makeCoder().encode(response, true);
+ }
+
+ } catch (CoderException e) {
+ String type = (direction == EventType.IN ? "response" : "request");
+ logger.warn("cannot pretty-print {}", type, e);
+ json = response.toString();
+ }
+
+ logger.info("[{}|{}|{}|]{}{}", direction, infra, source, NetLoggerUtil.SYSTEM_LS, json);
+
+ return json;
+ }
+
// these may be overridden by subclasses or junit tests
/**
protected long getTimeoutMs(Integer timeoutSec) {
return (timeoutSec == null ? 0 : TimeUnit.MILLISECONDS.convert(timeoutSec, TimeUnit.SECONDS));
}
+
+ // these may be overridden by junit tests
+
+ protected Coder makeCoder() {
+ return coder;
+ }
}