92843e28af58d27c1e834de9e16c643f819abbb4
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / pipeline / PipelineControllerFuture.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.pipeline;
22
23 import static org.onap.policy.controlloop.actorserviceprovider.Util.ident;
24
25 import java.util.concurrent.CompletableFuture;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.Future;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.BiConsumer;
30 import java.util.function.Function;
31 import java.util.function.Supplier;
32 import lombok.NoArgsConstructor;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Pipeline controller, used by operations within the pipeline to determine if they should
38  * continue to run. Whenever this is canceled or completed, it automatically cancels all
39  * futures and runs all listeners that have been added.
40  */
41 @NoArgsConstructor
42 public class PipelineControllerFuture<T> extends CompletableFuture<T> {
43
44     private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class);
45
46     private static final String COMPLETE_EXCEPT_MSG = "{}: complete future with exception";
47     private static final String CANCEL_MSG = "{}: cancel future";
48     private static final String COMPLETE_MSG = "{}: complete future";
49
50     /**
51      * Tracks items added to this controller via one of the <i>add</i> methods.
52      */
53     private final FutureManager futures = new FutureManager();
54
55
56     @Override
57     public boolean cancel(boolean mayInterruptIfRunning) {
58         return doAndStop(() -> super.cancel(mayInterruptIfRunning), CANCEL_MSG, ident(this));
59     }
60
61     @Override
62     public boolean complete(T value) {
63         return doAndStop(() -> super.complete(value), COMPLETE_MSG, ident(this));
64     }
65
66     @Override
67     public boolean completeExceptionally(Throwable ex) {
68         return doAndStop(() -> super.completeExceptionally(ex), COMPLETE_EXCEPT_MSG, ident(this));
69     }
70
71     @Override
72     public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor executor) {
73         return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)), executor);
74     }
75
76     @Override
77     public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) {
78         return super.completeAsync(() -> doAndStop(supplier, COMPLETE_MSG, ident(this)));
79     }
80
81     @Override
82     public CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit) {
83         logger.info("{}: set future timeout to {} {}", ident(this), timeout, unit);
84         return super.completeOnTimeout(value, timeout, unit);
85     }
86
87     @Override
88     public <U> PipelineControllerFuture<U> newIncompleteFuture() {
89         return new PipelineControllerFuture<>();
90     }
91
92     /**
93      * Generates a function that, when invoked, will remove the given future. This is
94      * typically added onto the end of a pipeline via one of the
95      * {@link CompletableFuture#whenComplete(BiConsumer)} methods.
96      *
97      * @return a function that removes the given future
98      */
99     public <F> BiConsumer<F, Throwable> delayedRemove(Future<F> future) {
100         return (value, thrown) -> remove(future);
101     }
102
103     /**
104      * Generates a function that, when invoked, will remove the given listener. This is
105      * typically added onto the end of a pipeline via one of the
106      * {@link CompletableFuture#whenComplete(BiConsumer)} methods.
107      *
108      * @return a function that removes the given listener
109      */
110     public <F> BiConsumer<F, Throwable> delayedRemove(Runnable listener) {
111         return (value, thrown) -> remove(listener);
112     }
113
114     /**
115      * Generates a function that, when invoked, will stop all pipeline listeners and
116      * complete this future. This is typically added onto the end of a pipeline via one of
117      * the {@link CompletableFuture#whenComplete(BiConsumer)} methods.
118      *
119      * @return a function that stops all pipeline listeners
120      */
121     public BiConsumer<T, Throwable> delayedComplete() {
122         return (value, thrown) -> {
123             if (thrown == null) {
124                 complete(value);
125             } else {
126                 completeExceptionally(thrown);
127             }
128         };
129     }
130
131     /**
132      * Adds a future to the controller and arranges for it to be removed from the
133      * controller when it completes, whether or not it throws an exception. If the
134      * controller has already been stopped, then the future is canceled and a new,
135      * incomplete future is returned.
136      *
137      * @param future future to be wrapped
138      * @return a new future
139      */
140     public CompletableFuture<T> wrap(CompletableFuture<T> future) {
141         if (!isRunning()) {
142             logger.trace("{}: not running, skipping next task {}", ident(this), ident(future));
143             future.cancel(false);
144             return new CompletableFuture<>();
145         }
146
147         add(future);
148         return future.whenComplete(this.delayedRemove(future));
149     }
150
151     /**
152      * Adds a function whose return value is to be canceled when this controller is
153      * stopped. Note: if the controller is already stopped, then the function will
154      * <i>not</i> be executed.
155      *
156      * @param futureMaker function to be invoked to create the future
157      * @return a function to create the future and arrange for it to be managed by this
158      *         controller
159      */
160     public <F> Function<F, CompletableFuture<F>> wrap(Function<F, CompletableFuture<F>> futureMaker) {
161
162         return input -> {
163             if (!isRunning()) {
164                 logger.trace("{}: discarded new future", ident(this));
165                 return new CompletableFuture<>();
166             }
167
168             CompletableFuture<F> future = futureMaker.apply(input);
169             add(future);
170
171             return future.whenComplete(delayedRemove(future));
172         };
173     }
174
175     public <F> void add(Future<F> future) {
176         logger.trace("{}: add future {}", ident(this), ident(future));
177         futures.add(future);
178     }
179
180     public void add(Runnable listener) {
181         logger.trace("{}: add listener {}", ident(this), ident(listener));
182         futures.add(listener);
183     }
184
185     public boolean isRunning() {
186         return futures.isRunning();
187     }
188
189     public <F> void remove(Future<F> future) {
190         logger.trace("{}: remove future {}", ident(this), ident(future));
191         futures.remove(future);
192     }
193
194     public void remove(Runnable listener) {
195         logger.trace("{}: remove listener {}", ident(this), ident(listener));
196         futures.remove(listener);
197     }
198
199     /**
200      * Performs an operation, stops the futures, and returns the value from the operation.
201      * Logs a message using the given arguments.
202      *
203      *
204      * @param <R> type of value to be returned
205      * @param supplier operation to perform
206      * @param message message to be logged
207      * @param args message arguments to fill "{}" place-holders
208      * @return the operation's result
209      */
210     private <R> R doAndStop(Supplier<R> supplier, String message, Object... args) {
211         try {
212             logger.trace(message, args);
213             return supplier.get();
214
215         } finally {
216             logger.trace("{}: stopping this future", ident(this));
217             futures.stop();
218         }
219     }
220 }