2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.controlloop.actorserviceprovider.pipeline;
23 import static org.onap.policy.controlloop.actorserviceprovider.Util.ident;
25 import java.util.concurrent.CompletableFuture;
26 import java.util.concurrent.Future;
27 import java.util.function.BiConsumer;
28 import java.util.function.Function;
29 import lombok.NoArgsConstructor;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * Pipeline controller, used by operations within the pipeline to determine if they should
35 * continue to run. If {@link #cancel(boolean)} is invoked, it automatically stops the
39 public class PipelineControllerFuture<T> extends CompletableFuture<T> {
41 private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class);
44 * Tracks items added to this controller via one of the <i>add</i> methods.
46 private final FutureManager futures = new FutureManager();
50 * Cancels and stops the pipeline, in that order.
53 public boolean cancel(boolean mayInterruptIfRunning) {
55 logger.trace("{}: cancel future", ident(this));
56 return super.cancel(mayInterruptIfRunning);
64 * Generates a function that, when invoked, will remove the given future. This is
65 * typically added onto the end of a pipeline via one of the
66 * {@link CompletableFuture#whenComplete(BiConsumer)} methods.
68 * @return a function that removes the given future
70 public <F> BiConsumer<T, Throwable> delayedRemove(Future<F> future) {
71 return (value, thrown) -> {
72 logger.trace("{}: remove future {}", ident(this), ident(future));
78 * Generates a function that, when invoked, will remove the given listener. This is
79 * typically added onto the end of a pipeline via one of the
80 * {@link CompletableFuture#whenComplete(BiConsumer)} methods.
82 * @return a function that removes the given listener
84 public BiConsumer<T, Throwable> delayedRemove(Runnable listener) {
85 return (value, thrown) -> {
86 logger.trace("{}: remove listener {}", ident(this), ident(listener));
92 * Generates a function that, when invoked, will stop all pipeline listeners and
93 * complete this future. This is typically added onto the end of a pipeline via one of
94 * the {@link CompletableFuture#whenComplete(BiConsumer)} methods.
96 * @return a function that stops all pipeline listeners
98 public BiConsumer<T, Throwable> delayedComplete() {
99 return (value, thrown) -> {
100 if (thrown == null) {
101 logger.trace("{}: complete and stop future", ident(this));
104 logger.trace("{}: complete exceptionally and stop future", ident(this));
105 completeExceptionally(thrown);
113 * Adds a function whose return value is to be canceled when this controller is
114 * stopped. Note: if the controller is already stopped, then the function will
115 * <i>not</i> be executed.
117 * @param futureMaker function to be invoked in the future
119 public <F> Function<F, CompletableFuture<F>> add(Function<F, CompletableFuture<F>> futureMaker) {
123 logger.trace("{}: discarded new future", ident(this));
124 return new CompletableFuture<>();
127 CompletableFuture<F> future = futureMaker.apply(input);
134 public <F> void add(Future<F> future) {
135 logger.trace("{}: add future {}", ident(this), ident(future));
139 public void add(Runnable listener) {
140 logger.trace("{}: add listener {}", ident(this), ident(listener));
141 futures.add(listener);
144 public boolean isRunning() {
145 return futures.isRunning();
148 public <F> void remove(Future<F> future) {
149 logger.trace("{}: remove future {}", ident(this), ident(future));
150 futures.remove(future);
153 public void remove(Runnable listener) {
154 logger.trace("{}: remove listener {}", ident(this), ident(listener));
155 futures.remove(listener);