import static org.onap.policy.controlloop.actorserviceprovider.Util.ident;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import java.util.function.Supplier;
import lombok.NoArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Pipeline controller, used by operations within the pipeline to determine if they should
- * continue to run. If {@link #cancel(boolean)} is invoked, it automatically stops the
- * pipeline.
+ * continue to run. Whenever this is canceled or completed, it automatically cancels all
+ * futures and runs all listeners that have been added.
*/
@NoArgsConstructor
public class PipelineControllerFuture<T> extends CompletableFuture<T> {
private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class);
+ private static final String COMPLETE_EXCEPT_MSG = "{}: complete future with exception";
+ private static final String CANCEL_MSG = "{}: cancel future";
+ private static final String COMPLETE_MSG = "{}: complete future";
+
/**
* Tracks items added to this controller via one of the <i>add</i> methods.
*/
private final FutureManager futures = new FutureManager();
- /**
- * Cancels and stops the pipeline, in that order.
- */
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
- try {
- logger.trace("{}: cancel future", ident(this));
- return super.cancel(mayInterruptIfRunning);
+ return doAndStop(() -> super.cancel(mayInterruptIfRunning), CANCEL_MSG, ident(this));
+ }
- } finally {
- futures.stop();
- }
+ @Override
+ public boolean complete(T value) {
+ return doAndStop(() -> super.complete(value), COMPLETE_MSG, ident(this));
+ }
+
+ @Override
+ public boolean completeExceptionally(Throwable ex) {
+ return doAndStop(() -> super.completeExceptionally(ex), COMPLETE_EXCEPT_MSG, ident(this));
+ }
+
+ @Override
+ public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor) {
+ return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)), executor);
+ }
+
+ @Override
+ public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) {
+ return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)));
+ }
+
+ @Override
+ public CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit) {
+ logger.info("{}: set future timeout to {} {}", ident(this), timeout, unit);
+ return super.completeOnTimeout(value, timeout, unit);
+ }
+
+ @Override
+ public <U> PipelineControllerFuture<U> newIncompleteFuture() {
+ return new PipelineControllerFuture<>();
}
/**
*
* @return a function that removes the given future
*/
- public <F> BiConsumer<T, Throwable> delayedRemove(Future<F> future) {
- return (value, thrown) -> {
- logger.trace("{}: remove future {}", ident(this), ident(future));
- remove(future);
- };
+ public <F> BiConsumer<F, Throwable> delayedRemove(Future<F> future) {
+ return (value, thrown) -> remove(future);
}
/**
*
* @return a function that removes the given listener
*/
- public BiConsumer<T, Throwable> delayedRemove(Runnable listener) {
- return (value, thrown) -> {
- logger.trace("{}: remove listener {}", ident(this), ident(listener));
- remove(listener);
- };
+ public <F> BiConsumer<F, Throwable> delayedRemove(Runnable listener) {
+ return (value, thrown) -> remove(listener);
}
/**
public BiConsumer<T, Throwable> delayedComplete() {
return (value, thrown) -> {
if (thrown == null) {
- logger.trace("{}: complete and stop future", ident(this));
complete(value);
} else {
- logger.trace("{}: complete exceptionally and stop future", ident(this));
completeExceptionally(thrown);
}
-
- futures.stop();
};
}
+ /**
+ * Adds a future to the controller and arranges for it to be removed from the
+ * controller when it completes, whether it throws an exception. If the
+ * controller has already been stopped, then the future is canceled and a new,
+ * incomplete future is returned.
+ *
+ * @param future future to be wrapped
+ * @return a new future
+ */
+ public CompletableFuture<T> wrap(CompletableFuture<T> future) {
+ if (!isRunning()) {
+ logger.trace("{}: not running, skipping next task {}", ident(this), ident(future));
+ future.cancel(false);
+ return new CompletableFuture<>();
+ }
+
+ add(future);
+ return future.whenComplete(this.delayedRemove(future));
+ }
+
/**
* Adds a function whose return value is to be canceled when this controller is
* stopped. Note: if the controller is already stopped, then the function will
* <i>not</i> be executed.
*
- * @param futureMaker function to be invoked in the future
+ * @param futureMaker function to be invoked to create the future
+ * @return a function to create the future and arrange for it to be managed by this
+ * controller
*/
- public <F> Function<F, CompletableFuture<F>> add(Function<F, CompletableFuture<F>> futureMaker) {
+ public <F> Function<F, CompletableFuture<F>> wrap(Function<F, CompletableFuture<F>> futureMaker) {
return input -> {
if (!isRunning()) {
CompletableFuture<F> future = futureMaker.apply(input);
add(future);
- return future;
+ return future.whenComplete(delayedRemove(future));
};
}
logger.trace("{}: remove listener {}", ident(this), ident(listener));
futures.remove(listener);
}
+
+ /**
+ * Performs an operation, stops the futures, and returns the value from the operation.
+ * Logs a message using the given arguments.
+ *
+ *
+ * @param <R> type of value to be returned
+ * @param supplier operation to perform
+ * @param message message to be logged
+ * @param args message arguments to fill "{}" place-holders
+ * @return the operation's result
+ */
+ private <R> R doAndStop(Supplier<R> supplier, String message, Object... args) {
+ try {
+ logger.trace(message, args);
+ return supplier.get();
+
+ } finally {
+ logger.trace("{}: stopping this future", ident(this));
+ futures.stop();
+ }
+ }
}