1 package org.onap.sdc.dcae.catalog.commons;
4 import java.util.LinkedList;
5 import java.util.Collections;
7 import java.util.concurrent.CountDownLatch;
8 import java.util.function.Function;
10 import org.onap.sdc.common.onaplog.OnapLoggerDebug;
11 import org.onap.sdc.common.onaplog.OnapLoggerError;
12 import org.onap.sdc.dcae.catalog.commons.Future;
13 import org.onap.sdc.dcae.catalog.commons.FutureHandler;
14 import org.onap.sdc.common.onaplog.Enums.LogLevel;
19 public class Futures<T> {
25 public static <T> Future<T> failedFuture(Throwable theError) {
26 return new BasicFuture<T>()
30 public static <T> Future<T> succeededFuture(T theResult) {
31 return new BasicFuture<T>()
35 public static <T> Future<T> future() {
36 return new BasicFuture<T>();
39 public static <U,V> Future<V> advance(Future<U> theStep,
40 final Function<U,V> theResultFunction) {
41 return advance(theStep, theResultFunction, Function.identity());
44 public static <U,V> Future<V> advance(Future<U> theStep,
45 final Function<U,V> theResultFunction,
46 final Function<Throwable, Throwable> theErrorFunction) {
47 final Future<V> adv = new BasicFuture<V>();
48 theStep.setHandler(new FutureHandler<U>() {
49 public void handle(Future<U> theResult) {
50 if (theResult.failed())
51 adv.cause(theErrorFunction.apply(theResult.cause()));
53 adv.result(theResultFunction.apply(theResult.result()));
60 public static class BasicFuture<T> implements Future<T> {
62 protected boolean succeeded,
65 protected FutureHandler<T> handler;
66 protected Throwable cause;
70 protected BasicFuture() {
77 public Future<T> result(T theResult) {
78 this.result = theResult;
79 this.succeeded = true;
86 public Throwable cause() {
90 public Future<T> cause(Throwable theCause) {
91 this.cause = theCause;
94 this.succeeded = false;
99 public boolean succeeded() {
100 return this.succeeded;
103 public boolean failed() {
107 public boolean complete() {
108 return this.failed || this.succeeded;
111 public Future<T> setHandler(FutureHandler<T> theHandler) {
112 this.handler = theHandler;
117 public T waitForResult() throws Exception {
118 BasicHandler<T> hnd = buildHandler();
120 hnd.waitForCompletion();
122 throw (Exception)cause();
127 public Future<T> waitForCompletion() throws InterruptedException {
128 BasicHandler<T> hnd = buildHandler();
130 hnd.waitForCompletion();
134 protected void callHandler() {
135 if (this.handler != null && complete()) {
136 this.handler.handle(this);
140 protected BasicHandler<T> buildHandler() {
141 return new BasicHandler<T>();
147 public static class BasicHandler<T>
148 implements FutureHandler<T> {
150 protected T result = null;
151 protected Throwable error = null;
152 protected CountDownLatch latch = null;
155 this(new CountDownLatch(1));
158 BasicHandler(CountDownLatch theLatch) {
159 this.latch = theLatch;
162 public void handle(Future<T> theResult) {
164 if (this.latch != null) {
165 this.latch.countDown();
169 protected void process(Future<T> theResult) {
170 if (theResult.failed()) {
171 this.error = theResult.cause();
174 this.result = theResult.result();
178 public T result(boolean doWait)
179 throws InterruptedException, RuntimeException {
183 if (null == this.error)
186 throw new RuntimeException(this.error);
190 throws InterruptedException, RuntimeException {
194 public BasicHandler<T> waitForCompletion() throws InterruptedException {
201 public static class Accumulator<T> extends BasicFuture<List<T>>
202 implements Future<List<T>> {
204 protected List<Future<T>> futures = new LinkedList<Future<T>>();
205 //protected List<T> results = new LinkedList<T>();
206 protected BasicHandler<T> handler = null;
208 private static OnapLoggerError errLogger = OnapLoggerError.getInstance();
209 private static OnapLoggerDebug debugLogger = OnapLoggerDebug.getInstance();
211 public Accumulator() {
212 this.result = new LinkedList<T>();
215 public Accumulator<T> add(Future<T> theFuture) {
216 debugLogger.log(LogLevel.DEBUG, this.getClass().getName(), "Intersection add");
217 this.futures.add(theFuture);
218 this.result.add(null);
222 public Accumulator<T> addAll(Accumulator<T> theFutures) {
224 debugLogger.log(LogLevel.DEBUG, this.getClass().getName(), "Intersection addAll");
229 public Future<List<T>> accumulate() {
230 this.futures = Collections.unmodifiableList(this.futures);
231 this.handler = new BasicHandler<T>(new CountDownLatch(this.futures.size())) {
232 protected void process(Future<T> theResult) {
233 if (theResult.failed()) {
234 Accumulator.this.cause = theResult.cause();
237 Accumulator.this.result.set(
238 Accumulator.this.futures.indexOf(theResult), theResult.result());
240 if (this.latch.getCount() == 1) {
241 if (Accumulator.this.cause != null)
242 Accumulator.this.cause(Accumulator.this.cause);
244 Accumulator.this.result(Accumulator.this.result);
249 .forEach(f -> f.setHandler(this.handler));