2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.pipeline;
23 import static org.onap.policy.controlloop.actorserviceprovider.Util.ident;
25 import java.util.concurrent.CompletableFuture;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.BiConsumer;
30 import java.util.function.Function;
31 import java.util.function.Supplier;
32 import lombok.NoArgsConstructor;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * Pipeline controller, used by operations within the pipeline to determine if they should
38 * continue to run. Whenever this is canceled or completed, it automatically cancels all
39 * futures and runs all listeners that have been added.
42 public class PipelineControllerFuture<T> extends CompletableFuture<T> {
44 private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class);
46 private static final String COMPLETE_EXCEPT_MSG = "{}: complete future with exception";
47 private static final String CANCEL_MSG = "{}: cancel future";
48 private static final String COMPLETE_MSG = "{}: complete future";
51 * Tracks items added to this controller via one of the <i>add</i> methods.
53 private final FutureManager futures = new FutureManager();
57 public boolean cancel(boolean mayInterruptIfRunning) {
58 return doAndStop(() -> super.cancel(mayInterruptIfRunning), CANCEL_MSG, ident(this));
62 public boolean complete(T value) {
63 return doAndStop(() -> super.complete(value), COMPLETE_MSG, ident(this));
67 public boolean completeExceptionally(Throwable ex) {
68 return doAndStop(() -> super.completeExceptionally(ex), COMPLETE_EXCEPT_MSG, ident(this));
72 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor) {
73 return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)), executor);
77 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) {
78 return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)));
82 public CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit) {
83 logger.info("{}: set future timeout to {} {}", ident(this), timeout, unit);
84 return super.completeOnTimeout(value, timeout, unit);
88 public <U> PipelineControllerFuture<U> newIncompleteFuture() {
89 return new PipelineControllerFuture<>();
93 * Generates a function that, when invoked, will remove the given future. This is
94 * typically added onto the end of a pipeline via one of the
95 * {@link CompletableFuture#whenComplete(BiConsumer)} methods.
97 * @return a function that removes the given future
99 public <F> BiConsumer<F, Throwable> delayedRemove(Future<F> future) {
100 return (value, thrown) -> remove(future);
104 * Generates a function that, when invoked, will remove the given listener. This is
105 * typically added onto the end of a pipeline via one of the
106 * {@link CompletableFuture#whenComplete(BiConsumer)} methods.
108 * @return a function that removes the given listener
110 public <F> BiConsumer<F, Throwable> delayedRemove(Runnable listener) {
111 return (value, thrown) -> remove(listener);
115 * Generates a function that, when invoked, will stop all pipeline listeners and
116 * complete this future. This is typically added onto the end of a pipeline via one of
117 * the {@link CompletableFuture#whenComplete(BiConsumer)} methods.
119 * @return a function that stops all pipeline listeners
121 public BiConsumer<T, Throwable> delayedComplete() {
122 return (value, thrown) -> {
123 if (thrown == null) {
126 completeExceptionally(thrown);
132 * Adds a future to the controller and arranges for it to be removed from the
133 * controller when it completes, whether or not it throws an exception. If the
134 * controller has already been stopped, then the future is canceled and a new,
135 * incomplete future is returned.
137 * @param future future to be wrapped
138 * @return a new future
140 public CompletableFuture<T> wrap(CompletableFuture<T> future) {
142 logger.trace("{}: not running, skipping next task {}", ident(this), ident(future));
143 future.cancel(false);
144 return new CompletableFuture<>();
148 return future.whenComplete(this.delayedRemove(future));
152 * Adds a function whose return value is to be canceled when this controller is
153 * stopped. Note: if the controller is already stopped, then the function will
154 * <i>not</i> be executed.
156 * @param futureMaker function to be invoked to create the future
157 * @return a function to create the future and arrange for it to be managed by this
160 public <F> Function<F, CompletableFuture<F>> wrap(Function<F, CompletableFuture<F>> futureMaker) {
164 logger.trace("{}: discarded new future", ident(this));
165 return new CompletableFuture<>();
168 CompletableFuture<F> future = futureMaker.apply(input);
171 return future.whenComplete(delayedRemove(future));
175 public <F> void add(Future<F> future) {
176 logger.trace("{}: add future {}", ident(this), ident(future));
180 public void add(Runnable listener) {
181 logger.trace("{}: add listener {}", ident(this), ident(listener));
182 futures.add(listener);
185 public boolean isRunning() {
186 return futures.isRunning();
189 public <F> void remove(Future<F> future) {
190 logger.trace("{}: remove future {}", ident(this), ident(future));
191 futures.remove(future);
194 public void remove(Runnable listener) {
195 logger.trace("{}: remove listener {}", ident(this), ident(listener));
196 futures.remove(listener);
200 * Performs an operation, stops the futures, and returns the value from the operation.
201 * Logs a message using the given arguments.
204 * @param <R> type of value to be returned
205 * @param supplier operation to perform
206 * @param message message to be logged
207 * @param args message arguments to fill "{}" place-holders
208 * @return the operation's result
210 private <R> R doAndStop(Supplier<R> supplier, String message, Object... args) {
212 logger.trace(message, args);
213 return supplier.get();
216 logger.trace("{}: stopping this future", ident(this));