X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=models-interactions%2Fmodel-actors%2FactorServiceProvider%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fpolicy%2Fcontrolloop%2Factorserviceprovider%2Fpipeline%2FPipelineControllerFuture.java;h=288397e67ac9999d93c1245b23131f03e869f23a;hb=49f07db935d114b72a44e446867b16262dd552aa;hp=96c8f9e058779334f3d669970cb41faea0b4bcc2;hpb=accad88260f99c1b5c5329285b73aa84349e623b;p=policy%2Fmodels.git diff --git a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java index 96c8f9e05..288397e67 100644 --- a/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java +++ b/models-interactions/model-actors/actorServiceProvider/src/main/java/org/onap/policy/controlloop/actorserviceprovider/pipeline/PipelineControllerFuture.java @@ -23,41 +23,70 @@ package org.onap.policy.controlloop.actorserviceprovider.pipeline; import static org.onap.policy.controlloop.actorserviceprovider.Util.ident; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; import lombok.NoArgsConstructor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Pipeline controller, used by operations within the pipeline to determine if they should - * continue to run. If {@link #cancel(boolean)} is invoked, it automatically stops the - * pipeline. + * continue to run. Whenever this is canceled or completed, it automatically cancels all + * futures and runs all listeners that have been added. */ @NoArgsConstructor public class PipelineControllerFuture extends CompletableFuture { private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class); + private static final String COMPLETE_EXCEPT_MSG = "{}: complete future with exception"; + private static final String CANCEL_MSG = "{}: cancel future"; + private static final String COMPLETE_MSG = "{}: complete future"; + /** * Tracks items added to this controller via one of the add methods. */ private final FutureManager futures = new FutureManager(); - /** - * Cancels and stops the pipeline, in that order. - */ @Override public boolean cancel(boolean mayInterruptIfRunning) { - try { - logger.trace("{}: cancel future", ident(this)); - return super.cancel(mayInterruptIfRunning); + return doAndStop(() -> super.cancel(mayInterruptIfRunning), CANCEL_MSG, ident(this)); + } - } finally { - futures.stop(); - } + @Override + public boolean complete(T value) { + return doAndStop(() -> super.complete(value), COMPLETE_MSG, ident(this)); + } + + @Override + public boolean completeExceptionally(Throwable ex) { + return doAndStop(() -> super.completeExceptionally(ex), COMPLETE_EXCEPT_MSG, ident(this)); + } + + @Override + public CompletableFuture completeAsync(Supplier supplier, Executor executor) { + return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)), executor); + } + + @Override + public CompletableFuture completeAsync(Supplier supplier) { + return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this))); + } + + @Override + public CompletableFuture completeOnTimeout(T value, long timeout, TimeUnit unit) { + logger.info("{}: set future timeout to {} {}", ident(this), timeout, unit); + return super.completeOnTimeout(value, timeout, unit); + } + + @Override + public PipelineControllerFuture newIncompleteFuture() { + return new PipelineControllerFuture<>(); } /** @@ -67,11 +96,8 @@ public class PipelineControllerFuture extends CompletableFuture { * * @return a function that removes the given future */ - public BiConsumer delayedRemove(Future future) { - return (value, thrown) -> { - logger.trace("{}: remove future {}", ident(this), ident(future)); - remove(future); - }; + public BiConsumer delayedRemove(Future future) { + return (value, thrown) -> remove(future); } /** @@ -81,11 +107,8 @@ public class PipelineControllerFuture extends CompletableFuture { * * @return a function that removes the given listener */ - public BiConsumer delayedRemove(Runnable listener) { - return (value, thrown) -> { - logger.trace("{}: remove listener {}", ident(this), ident(listener)); - remove(listener); - }; + public BiConsumer delayedRemove(Runnable listener) { + return (value, thrown) -> remove(listener); } /** @@ -98,25 +121,43 @@ public class PipelineControllerFuture extends CompletableFuture { public BiConsumer delayedComplete() { return (value, thrown) -> { if (thrown == null) { - logger.trace("{}: complete and stop future", ident(this)); complete(value); } else { - logger.trace("{}: complete exceptionally and stop future", ident(this)); completeExceptionally(thrown); } - - futures.stop(); }; } + /** + * Adds a future to the controller and arranges for it to be removed from the + * controller when it completes, whether it throws an exception. If the + * controller has already been stopped, then the future is canceled and a new, + * incomplete future is returned. + * + * @param future future to be wrapped + * @return a new future + */ + public CompletableFuture wrap(CompletableFuture future) { + if (!isRunning()) { + logger.trace("{}: not running, skipping next task {}", ident(this), ident(future)); + future.cancel(false); + return new CompletableFuture<>(); + } + + add(future); + return future.whenComplete(this.delayedRemove(future)); + } + /** * Adds a function whose return value is to be canceled when this controller is * stopped. Note: if the controller is already stopped, then the function will * not be executed. * - * @param futureMaker function to be invoked in the future + * @param futureMaker function to be invoked to create the future + * @return a function to create the future and arrange for it to be managed by this + * controller */ - public Function> add(Function> futureMaker) { + public Function> wrap(Function> futureMaker) { return input -> { if (!isRunning()) { @@ -127,7 +168,7 @@ public class PipelineControllerFuture extends CompletableFuture { CompletableFuture future = futureMaker.apply(input); add(future); - return future; + return future.whenComplete(delayedRemove(future)); }; } @@ -154,4 +195,26 @@ public class PipelineControllerFuture extends CompletableFuture { logger.trace("{}: remove listener {}", ident(this), ident(listener)); futures.remove(listener); } + + /** + * Performs an operation, stops the futures, and returns the value from the operation. + * Logs a message using the given arguments. + * + * + * @param type of value to be returned + * @param supplier operation to perform + * @param message message to be logged + * @param args message arguments to fill "{}" place-holders + * @return the operation's result + */ + private R doAndStop(Supplier supplier, String message, Object... args) { + try { + logger.trace(message, args); + return supplier.get(); + + } finally { + logger.trace("{}: stopping this future", ident(this)); + futures.stop(); + } + } }