Remove dmaap from models
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / pipeline / PipelineControllerFuture.java
index 96c8f9e..288397e 100644 (file)
@@ -23,41 +23,70 @@ package org.onap.policy.controlloop.actorserviceprovider.pipeline;
 import static org.onap.policy.controlloop.actorserviceprovider.Util.ident;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import lombok.NoArgsConstructor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Pipeline controller, used by operations within the pipeline to determine if they should
- * continue to run. If {@link #cancel(boolean)} is invoked, it automatically stops the
- * pipeline.
+ * continue to run. Whenever this is canceled or completed, it automatically cancels all
+ * futures and runs all listeners that have been added.
  */
 @NoArgsConstructor
 public class PipelineControllerFuture<T> extends CompletableFuture<T> {
 
     private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class);
 
+    private static final String COMPLETE_EXCEPT_MSG = "{}: complete future with exception";
+    private static final String CANCEL_MSG = "{}: cancel future";
+    private static final String COMPLETE_MSG = "{}: complete future";
+
     /**
      * Tracks items added to this controller via one of the <i>add</i> methods.
      */
     private final FutureManager futures = new FutureManager();
 
 
-    /**
-     * Cancels and stops the pipeline, in that order.
-     */
     @Override
     public boolean cancel(boolean mayInterruptIfRunning) {
-        try {
-            logger.trace("{}: cancel future", ident(this));
-            return super.cancel(mayInterruptIfRunning);
+        return doAndStop(() -> super.cancel(mayInterruptIfRunning), CANCEL_MSG, ident(this));
+    }
 
-        } finally {
-            futures.stop();
-        }
+    @Override
+    public boolean complete(T value) {
+        return doAndStop(() -> super.complete(value), COMPLETE_MSG, ident(this));
+    }
+
+    @Override
+    public boolean completeExceptionally(Throwable ex) {
+        return doAndStop(() -> super.completeExceptionally(ex), COMPLETE_EXCEPT_MSG, ident(this));
+    }
+
+    @Override
+    public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor) {
+        return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)), executor);
+    }
+
+    @Override
+    public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) {
+        return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)));
+    }
+
+    @Override
+    public CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit) {
+        logger.info("{}: set future timeout to {} {}", ident(this), timeout, unit);
+        return super.completeOnTimeout(value, timeout, unit);
+    }
+
+    @Override
+    public <U> PipelineControllerFuture<U> newIncompleteFuture() {
+        return new PipelineControllerFuture<>();
     }
 
     /**
@@ -67,11 +96,8 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> {
      *
      * @return a function that removes the given future
      */
-    public <F> BiConsumer<T, Throwable> delayedRemove(Future<F> future) {
-        return (value, thrown) -> {
-            logger.trace("{}: remove future {}", ident(this), ident(future));
-            remove(future);
-        };
+    public <F> BiConsumer<F, Throwable> delayedRemove(Future<F> future) {
+        return (value, thrown) -> remove(future);
     }
 
     /**
@@ -81,11 +107,8 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> {
      *
      * @return a function that removes the given listener
      */
-    public BiConsumer<T, Throwable> delayedRemove(Runnable listener) {
-        return (value, thrown) -> {
-            logger.trace("{}: remove listener {}", ident(this), ident(listener));
-            remove(listener);
-        };
+    public <F> BiConsumer<F, Throwable> delayedRemove(Runnable listener) {
+        return (value, thrown) -> remove(listener);
     }
 
     /**
@@ -98,25 +121,43 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> {
     public BiConsumer<T, Throwable> delayedComplete() {
         return (value, thrown) -> {
             if (thrown == null) {
-                logger.trace("{}: complete and stop future", ident(this));
                 complete(value);
             } else {
-                logger.trace("{}: complete exceptionally and stop future", ident(this));
                 completeExceptionally(thrown);
             }
-
-            futures.stop();
         };
     }
 
+    /**
+     * Adds a future to the controller and arranges for it to be removed from the
+     * controller when it completes, whether it throws an exception. If the
+     * controller has already been stopped, then the future is canceled and a new,
+     * incomplete future is returned.
+     *
+     * @param future future to be wrapped
+     * @return a new future
+     */
+    public CompletableFuture<T> wrap(CompletableFuture<T> future) {
+        if (!isRunning()) {
+            logger.trace("{}: not running, skipping next task {}", ident(this), ident(future));
+            future.cancel(false);
+            return new CompletableFuture<>();
+        }
+
+        add(future);
+        return future.whenComplete(this.delayedRemove(future));
+    }
+
     /**
      * Adds a function whose return value is to be canceled when this controller is
      * stopped. Note: if the controller is already stopped, then the function will
      * <i>not</i> be executed.
      *
-     * @param futureMaker function to be invoked in the future
+     * @param futureMaker function to be invoked to create the future
+     * @return a function to create the future and arrange for it to be managed by this
+     *         controller
      */
-    public <F> Function<F, CompletableFuture<F>> add(Function<F, CompletableFuture<F>> futureMaker) {
+    public <F> Function<F, CompletableFuture<F>> wrap(Function<F, CompletableFuture<F>> futureMaker) {
 
         return input -> {
             if (!isRunning()) {
@@ -127,7 +168,7 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> {
             CompletableFuture<F> future = futureMaker.apply(input);
             add(future);
 
-            return future;
+            return future.whenComplete(delayedRemove(future));
         };
     }
 
@@ -154,4 +195,26 @@ public class PipelineControllerFuture<T> extends CompletableFuture<T> {
         logger.trace("{}: remove listener {}", ident(this), ident(listener));
         futures.remove(listener);
     }
+
+    /**
+     * Performs an operation, stops the futures, and returns the value from the operation.
+     * Logs a message using the given arguments.
+     *
+     *
+     * @param <R> type of value to be returned
+     * @param supplier operation to perform
+     * @param message message to be logged
+     * @param args message arguments to fill "{}" place-holders
+     * @return the operation's result
+     */
+    private <R> R doAndStop(Supplier<R> supplier, String message, Object... args) {
+        try {
+            logger.trace(message, args);
+            return supplier.get();
+
+        } finally {
+            logger.trace("{}: stopping this future", ident(this));
+            futures.stop();
+        }
+    }
 }