Merge "Change recipe to operation to match type"
[policy/models.git] / models-interactions / model-actors / actorServiceProvider / src / main / java / org / onap / policy / controlloop / actorserviceprovider / pipeline / PipelineControllerFuture.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.controlloop.actorserviceprovider.pipeline;
22
23 import static org.onap.policy.controlloop.actorserviceprovider.Util.ident;
24
25 import java.util.concurrent.CompletableFuture;
26 import java.util.concurrent.Future;
27 import java.util.function.BiConsumer;
28 import java.util.function.Function;
29 import lombok.NoArgsConstructor;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 /**
34  * Pipeline controller, used by operations within the pipeline to determine if they should
35  * continue to run. If {@link #cancel(boolean)} is invoked, it automatically stops the
36  * pipeline.
37  */
38 @NoArgsConstructor
39 public class PipelineControllerFuture<T> extends CompletableFuture<T> {
40
41     private static final Logger logger = LoggerFactory.getLogger(PipelineControllerFuture.class);
42
43     /**
44      * Tracks items added to this controller via one of the <i>add</i> methods.
45      */
46     private final FutureManager futures = new FutureManager();
47
48
49     /**
50      * Cancels and stops the pipeline, in that order.
51      */
52     @Override
53     public boolean cancel(boolean mayInterruptIfRunning) {
54         try {
55             logger.trace("{}: cancel future", ident(this));
56             return super.cancel(mayInterruptIfRunning);
57
58         } finally {
59             futures.stop();
60         }
61     }
62
63     /**
64      * Generates a function that, when invoked, will remove the given future. This is
65      * typically added onto the end of a pipeline via one of the
66      * {@link CompletableFuture#whenComplete(BiConsumer)} methods.
67      *
68      * @return a function that removes the given future
69      */
70     public <F> BiConsumer<T, Throwable> delayedRemove(Future<F> future) {
71         return (value, thrown) -> {
72             logger.trace("{}: remove future {}", ident(this), ident(future));
73             remove(future);
74         };
75     }
76
77     /**
78      * Generates a function that, when invoked, will remove the given listener. This is
79      * typically added onto the end of a pipeline via one of the
80      * {@link CompletableFuture#whenComplete(BiConsumer)} methods.
81      *
82      * @return a function that removes the given listener
83      */
84     public BiConsumer<T, Throwable> delayedRemove(Runnable listener) {
85         return (value, thrown) -> {
86             logger.trace("{}: remove listener {}", ident(this), ident(listener));
87             remove(listener);
88         };
89     }
90
91     /**
92      * Generates a function that, when invoked, will stop all pipeline listeners and
93      * complete this future. This is typically added onto the end of a pipeline via one of
94      * the {@link CompletableFuture#whenComplete(BiConsumer)} methods.
95      *
96      * @return a function that stops all pipeline listeners
97      */
98     public BiConsumer<T, Throwable> delayedComplete() {
99         return (value, thrown) -> {
100             if (thrown == null) {
101                 logger.trace("{}: complete and stop future", ident(this));
102                 complete(value);
103             } else {
104                 logger.trace("{}: complete exceptionally and stop future", ident(this));
105                 completeExceptionally(thrown);
106             }
107
108             futures.stop();
109         };
110     }
111
112     /**
113      * Adds a function whose return value is to be canceled when this controller is
114      * stopped. Note: if the controller is already stopped, then the function will
115      * <i>not</i> be executed.
116      *
117      * @param futureMaker function to be invoked in the future
118      */
119     public <F> Function<F, CompletableFuture<F>> add(Function<F, CompletableFuture<F>> futureMaker) {
120
121         return input -> {
122             if (!isRunning()) {
123                 logger.trace("{}: discarded new future", ident(this));
124                 return new CompletableFuture<>();
125             }
126
127             CompletableFuture<F> future = futureMaker.apply(input);
128             add(future);
129
130             return future;
131         };
132     }
133
134     public <F> void add(Future<F> future) {
135         logger.trace("{}: add future {}", ident(this), ident(future));
136         futures.add(future);
137     }
138
139     public void add(Runnable listener) {
140         logger.trace("{}: add listener {}", ident(this), ident(listener));
141         futures.add(listener);
142     }
143
144     public boolean isRunning() {
145         return futures.isRunning();
146     }
147
148     public <F> void remove(Future<F> future) {
149         logger.trace("{}: remove future {}", ident(this), ident(future));
150         futures.remove(future);
151     }
152
153     public void remove(Runnable listener) {
154         logger.trace("{}: remove listener {}", ident(this), ident(listener));
155         futures.remove(listener);
156     }
157 }