2f74bf5a9bdcda8a88525979a2c3747553d48eea
[appc.git] / appc-oam / appc-oam-bundle / src / main / java / org / openecomp / appc / oam / util / AsyncTaskHelper.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  * ============LICENSE_END=========================================================
23  */
24
25 package org.onap.appc.oam.util;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import org.onap.appc.oam.AppcOam;
29 import org.onap.appc.oam.processor.BaseActionRunnable;
30 import org.onap.appc.statemachine.impl.readers.AppcOamStates;
31 import org.osgi.framework.Bundle;
32 import org.osgi.framework.FrameworkUtil;
33
34 import java.util.HashSet;
35 import java.util.Set;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.CancellationException;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.LinkedBlockingQueue;
42 import java.util.concurrent.ScheduledExecutorService;
43 import java.util.concurrent.ThreadPoolExecutor;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.TimeoutException;
46 import java.util.function.Consumer;
47
48 /**
49  * The AsyncTaskHelper class manages an internal parent child data structure.   The parent is a transient singleton,
50  * meaning only one can exist at any given time.     The parent is scheduled with the
51  * {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} and is executed at configured interval.   It can be
52  * terminated by using the {@link Future#cancel(boolean)} or the {@link Future#cancel(boolean)} returned from \
53  * {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}.
54  * <p>
55  * The children are scheduled using {@link #submitBaseSubCallable(Callable)}} and can only be scheduled if a parent
56  * is scheduled.   Children only execute once, but can be terminated preemptively by the {@link Future#cancel(boolean)}
57  * returned from {@link #submitBaseSubCallable(Callable)} or indirectly by terminating the parent via the method
58  * described above.
59  * <p>
60  * This class augments the meaning of {@link Future#isDone()} in that it guarantees that this method only returns true
61  * if the scheduled {@link Runnable} or {@link Callable}  is not currently executing and is not going to execute in the
62  * future.   This is different than the Java core implementation of {@link Future#isDone()} in which it will return
63  * true immediately after the {@link Future#cancel(boolean)} is called. Even if a Thread is actively executing the
64  * {@link Runnable} or {@link Callable} and has not return yet. See Java BUG JDK-8073704
65  * <p>
66  * The parent {@link Future#isDone()} has an additional augmentation in that it will not return true until all of its
67  * children's {@link Future#isDone()} also return true.
68  *
69  */
70 @SuppressWarnings("unchecked")
71 public class AsyncTaskHelper {
72
73     private final EELFLogger logger;
74     private final ScheduledExecutorService scheduledExecutorService;
75     private final ThreadPoolExecutor bundleOperationService;
76
77     /** Reference to {@link MyFuture} return from {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} */
78     private MyFuture backgroundBaseRunnableFuture;
79
80     /** The cancel Callback from {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}   */
81     private Consumer<AppcOam.RPC> cancelCallBackForBaseRunnable;
82
83     /** All Futures created by thus calls which have not completed -- {@link Future#isDone()} equals false  */
84     private Set<MyFuture> myFutureSet = new HashSet<>();
85
86     /**
87      * Constructor
88      * @param eelfLogger of the logger
89      */
90     public AsyncTaskHelper(EELFLogger eelfLogger) {
91         logger = eelfLogger;
92
93         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
94             (runnable) -> {
95                 Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
96                 return new Thread(runnable, bundle.getSymbolicName() + " scheduledExecutor");
97             }
98         );
99
100         bundleOperationService = new ThreadPoolExecutor(
101             0,
102             10,
103             10,
104             TimeUnit.SECONDS,
105             new LinkedBlockingQueue(), //BlockingQueue<Runnable> workQueue
106             (runnable) -> {
107                 Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
108                 return new Thread(runnable, bundle.getSymbolicName() + " bundle operation executor");
109             }
110         );
111     }
112
113     /**
114      * Terminate the class <bS>ScheduledExecutorService</b>
115      */
116     public void close() {
117         logDebug("Start shutdown scheduleExcutorService.");
118         bundleOperationService.shutdownNow();
119         scheduledExecutorService.shutdownNow();
120         logDebug("Completed shutdown scheduleExcutorService.");
121     }
122
123
124     /**
125      * Cancel currently executing {@link BaseActionRunnable} if any.
126      * This method returns immediately if there is currently no {@link BaseActionRunnable} actively executing.
127      * @param rpcCausingAbort - The RPC causing the abort
128      * @param stateBeingAbborted - The current state being canceled
129      * @param timeout - The amount of time to wait for a cancel to complete
130      * @param timeUnit - The unit of time of timeout
131      * @throws TimeoutException - If {@link BaseActionRunnable} has not completely cancelled within the timeout period
132      * @throws InterruptedException - If the Thread waiting for the abort
133      */
134     public synchronized void cancelBaseActionRunnable(final AppcOam.RPC rpcCausingAbort,
135                                                       AppcOamStates stateBeingAbborted,
136                                                       long timeout, TimeUnit timeUnit)
137         throws TimeoutException,InterruptedException {
138
139         final MyFuture localBackgroundBaseRunnableFuture = backgroundBaseRunnableFuture;
140         final Consumer<AppcOam.RPC> localCancelCallBackForBaseRunnable = cancelCallBackForBaseRunnable;
141
142         if (localBackgroundBaseRunnableFuture == null || localBackgroundBaseRunnableFuture.isDone()) {
143           return;
144         }
145
146         if (localCancelCallBackForBaseRunnable != null) {
147             localCancelCallBackForBaseRunnable.accept(rpcCausingAbort);
148         }
149         localBackgroundBaseRunnableFuture.cancel(true);
150
151         long timeoutMillis = timeUnit.toMillis(timeout);
152         long expiryTime = System.currentTimeMillis() + timeoutMillis;
153         while (!(localBackgroundBaseRunnableFuture.isDone())) {
154             long sleepTime = expiryTime - System.currentTimeMillis();
155             if (sleepTime < 1) {
156                 break;
157             }
158             this.wait(sleepTime);
159         }
160
161         if (!localBackgroundBaseRunnableFuture.isDone()) {
162             throw new TimeoutException(String.format("Unable to abort %s in timely manner.",stateBeingAbborted));
163         }
164     }
165
166     /**
167      * Schedule a {@link BaseActionRunnable} to begin async execution.   This is the Parent  {@link Runnable} for the
168      * children that are submitted by {@link #submitBaseSubCallable(Callable)}
169      *
170      * The currently executing {@link BaseActionRunnable} must fully be terminated before the next can be scheduled.
171      * This means all Tasks' {@link MyFuture#isDone()} must equal true and all threads must return to their respective
172      * thread pools.
173      *
174      * @param runnable of the to be scheduled service.
175      * @param cancelCallBack to be invoked when
176      *        {@link #cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)} is invoked.
177      * @param initialDelayMillis the time to delay first execution
178      * @param delayMillis the delay between the termination of one
179      * execution and the commencement of the next
180      * @return The {@link BaseActionRunnable}'s {@link Future}
181      * @throws IllegalStateException if there is currently executing Task
182      */
183     public synchronized Future<?> scheduleBaseRunnable(final Runnable runnable,
184                                                        final Consumer<AppcOam.RPC> cancelCallBack,
185                                                        long initialDelayMillis,
186                                                        long delayMillis)
187         throws IllegalStateException {
188
189         if (backgroundBaseRunnableFuture != null && !backgroundBaseRunnableFuture.isDone()) {
190             throw new IllegalStateException("Unable to schedule background task when one is already running.  All task must fully terminated before another can be scheduled. ");
191         }
192
193         this.cancelCallBackForBaseRunnable = cancelCallBack;
194
195         backgroundBaseRunnableFuture = new MyFuture(runnable) {
196             /**
197              * augments the cancel operation to cancel all subTack too,
198              */
199             @Override
200             public boolean cancel(final boolean mayInterruptIfRunning) {
201                 boolean cancel;
202                 synchronized (AsyncTaskHelper.this) {
203                     cancel = super.cancel(mayInterruptIfRunning);
204                     //clone the set to prevent java.util.ConcurrentModificationException.  The  synchronized prevents
205                     //other threads from modifying this set, but not itself.  The  f->f.cancel may modify myFutureSet by
206                     //removing an entry which breaks the iteration in the forEach.
207                     (new HashSet<MyFuture>(myFutureSet))
208                             .stream().filter(f->!this.equals(f)).forEach(f->f.cancel(mayInterruptIfRunning));
209                 }
210                 return cancel;
211             }
212
213             /**
214              * augments the isDone operation to return false until all subTask have completed too.
215              */
216             @Override
217             public boolean isDone() {
218                 synchronized (AsyncTaskHelper.this) {
219                     return myFutureSet.isEmpty();
220                 }
221             }
222         };
223         backgroundBaseRunnableFuture.setFuture(
224             scheduledExecutorService.scheduleWithFixedDelay(
225                 backgroundBaseRunnableFuture, initialDelayMillis, delayMillis, TimeUnit.MILLISECONDS)
226         );
227         return backgroundBaseRunnableFuture;
228     }
229
230     /**
231      * Submits children {@link Callable} to be executed as soon as possible,  A parent must have been scheduled
232      * previously via {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}
233      * @param callable the Callable to be submitted
234      * @return The {@link Callable}'s {@link Future}
235      */
236     synchronized Future<?> submitBaseSubCallable(final Callable callable) {
237
238         if (backgroundBaseRunnableFuture == null
239             || backgroundBaseRunnableFuture.isCancelled()
240             || backgroundBaseRunnableFuture.isDone()){
241             throw new IllegalStateException("Unable to schedule subCallable when a base Runnable is not running.");
242         }
243
244         //Make sure the pool is ready to go
245         if(bundleOperationService.getPoolSize() != bundleOperationService.getMaximumPoolSize()){
246             bundleOperationService.setCorePoolSize(bundleOperationService.getMaximumPoolSize());
247             bundleOperationService.prestartAllCoreThreads();
248             bundleOperationService.setCorePoolSize(0);
249         }
250
251         MyFuture<?> myFuture = new MyFuture(callable);
252         myFuture.setFuture(bundleOperationService.submit((Callable)myFuture));
253         return myFuture;
254     }
255
256     /**
257      * Genral debug log when debug logging level is enabled.
258      * @param message of the log message format
259      * @param args of the objects listed in the message format
260      */
261     private void logDebug(String message, Object... args) {
262         if (logger.isDebugEnabled()) {
263             logger.debug(String.format(message, args));
264         }
265     }
266
267     /**
268      * This class has two purposes.  First it insures  {@link #isDone()} only returns true if the deligate is not
269      * currently running and will not be running in the future: See Java BUG JDK-8073704 Second this class maintains
270      * the {@link #myFutureSet } by insurring that itself is removed when  {@link #isDone()} returns true.
271      *
272      * See {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} and {@link #submitBaseSubCallable(Callable)}
273      * for usage of this class
274      */
275     private class MyFuture<T> implements Future<T>, Runnable, Callable<T> {
276
277         private Future<T> future;
278         private final Runnable runnable;
279         private final Callable<T> callable;
280         private boolean isRunning;
281
282         MyFuture(Runnable runnable) {
283             this.runnable = runnable;
284             this.callable = null;
285             myFutureSet.add(this);
286         }
287
288         MyFuture(Callable<T> callable) {
289             this.runnable = null;
290             this.callable = callable;
291             myFutureSet.add(this);
292         }
293
294         void setFuture(Future<T> future) {
295             this.future = future;
296         }
297
298         @Override
299         public boolean cancel(boolean mayInterruptIfRunning) {
300             synchronized (AsyncTaskHelper.this) {
301                 if (!isRunning) {
302                     myFutureSetRemove();
303                 }
304
305                 return future.cancel(mayInterruptIfRunning);
306             }
307         }
308
309         @Override
310         public boolean isCancelled() {
311             synchronized (AsyncTaskHelper.this) {
312                 return future.isCancelled();
313             }
314         }
315
316         @Override
317         public boolean isDone() {
318             synchronized (AsyncTaskHelper.this) {
319                 return future.isDone() && !isRunning;
320             }
321         }
322
323         @Override
324         public T get() throws InterruptedException, ExecutionException {
325                 return future.get();
326         }
327
328         @Override
329         public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
330             return future.get(timeout, unit);
331         }
332
333         @Override
334         public void run() {
335             synchronized (AsyncTaskHelper.this) {
336                 if(future.isCancelled()){
337                     return;
338                 }
339                 isRunning = true;
340             }
341             try {
342                 runnable.run();
343             } finally {
344                 synchronized (AsyncTaskHelper.this) {
345                     isRunning = false;
346
347                     //The Base Runnable is expected to run again.
348                     //unless it has been canceled.
349                     //so only removed if it is canceled.
350                     if (future.isCancelled()) {
351                         myFutureSetRemove();
352                     }
353                 }
354             }
355         }
356
357         @Override
358         public T call() throws Exception {
359             synchronized (AsyncTaskHelper.this) {
360                 if(future.isCancelled()){
361                     throw new CancellationException();
362                 }
363                 isRunning = true;
364             }
365             try {
366                 return callable.call();
367             } finally {
368                 synchronized (AsyncTaskHelper.this){
369                     isRunning = false;
370                     myFutureSetRemove();
371                 }
372             }
373         }
374
375
376         /**
377          * Removes this from the the myFutureSet.
378          * When all the BaseActionRunnable is Done notify any thread waiting in
379          * {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}
380          */
381         void myFutureSetRemove(){
382             synchronized (AsyncTaskHelper.this) {
383                 myFutureSet.remove(this);
384                 if(myFutureSet.isEmpty()){
385                     backgroundBaseRunnableFuture = null;
386                     cancelCallBackForBaseRunnable = null;
387                     AsyncTaskHelper.this.notifyAll();
388
389                 }
390             }
391         }
392
393     }
394 }