serializing OAM async task
[appc.git] / appc-oam / appc-oam-bundle / src / main / java / org / openecomp / appc / oam / util / AsyncTaskHelper.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  * ============LICENSE_END=========================================================
23  */
24
25 package org.openecomp.appc.oam.util;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import org.openecomp.appc.oam.AppcOam;
29 import org.openecomp.appc.oam.processor.BaseActionRunnable;
30 import org.openecomp.appc.statemachine.impl.readers.AppcOamStates;
31 import org.osgi.framework.Bundle;
32 import org.osgi.framework.FrameworkUtil;
33
34 import java.util.HashSet;
35 import java.util.Set;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.CancellationException;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.LinkedBlockingQueue;
42 import java.util.concurrent.ScheduledExecutorService;
43 import java.util.concurrent.ThreadPoolExecutor;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.TimeoutException;
46 import java.util.function.Consumer;
47
48 /**
49  * The AsyncTaskHelper class manages an internal parent child data structure.   The parent is a transient singleton,
50  * meaning only one can exist at any given time.     The parent is scheduled with the
51  * {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} and is executed at configured interval.   It can be
52  * terminated by using the {@link Future#cancel(boolean)} or the {@link Future#cancel(boolean)} returned from \
53  * {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}.
54  * <p>
55  * The children are scheduled using {@link #submitBaseSubCallable(Callable)}} and can only be scheduled if a parent
56  * is scheduled.   Children only execute once, but can be terminated preemptively by the {@link Future#cancel(boolean)}
57  * returned from {@link #submitBaseSubCallable(Callable)} or indirectly by terminating the parent via the method
58  * described above.
59  * <p>
60  * This class augments the meaning of {@link Future#isDone()} in that it guarantees that this method only returns true
61  * if the scheduled {@link Runnable} or {@link Callable}  is not currently executing and is not going to execute in the
62  * future.   This is different than the Java core implementation of {@link Future#isDone()} in which it will return
63  * true immediately after the {@link Future#cancel(boolean)} is called. Even if a Thread is actively executing the
64  * {@link Runnable} or {@link Callable} and has not return yet. See Java BUG JDK-8073704
65  * <p>
66  * The parent {@link Future#isDone()} has an additional augmentation in that it will not return true until all of its
67  * children's {@link Future#isDone()} also return true.
68  *
69  */
70 @SuppressWarnings("unchecked")
71 public class AsyncTaskHelper {
72
73     private final EELFLogger logger;
74     private final ScheduledExecutorService scheduledExecutorService;
75     private final ThreadPoolExecutor bundleOperationService;
76
77     /** Reference to {@link MyFuture} return from {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} */
78     private MyFuture backgroundBaseRunnableFuture;
79
80     /** The cancel Callback from {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}   */
81     private Consumer<AppcOam.RPC> cancelCallBackForBaseRunnable;
82
83     /** All Futures created by thus calls which have not completed -- {@link Future#isDone()} equals false  */
84     private Set<MyFuture> myFutureSet = new HashSet<>();
85
86     /**
87      * Constructor
88      * @param eelfLogger of the logger
89      */
90     public AsyncTaskHelper(EELFLogger eelfLogger) {
91         logger = eelfLogger;
92
93         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
94             (runnable) -> {
95                 Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
96                 return new Thread(runnable, bundle.getSymbolicName() + " scheduledExecutor");
97             }
98         );
99
100         bundleOperationService = new ThreadPoolExecutor(
101             0,
102             10,
103             10,
104             TimeUnit.SECONDS,
105             new LinkedBlockingQueue(), //BlockingQueue<Runnable> workQueue
106             (runnable) -> {
107                 Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
108                 return new Thread(runnable, bundle.getSymbolicName() + " bundle operation executor");
109             }
110         );
111     }
112
113     /**
114      * Terminate the class <bS>ScheduledExecutorService</b>
115      */
116     public void close() {
117         logDebug("Start shutdown scheduleExcutorService.");
118         bundleOperationService.shutdownNow();
119         scheduledExecutorService.shutdownNow();
120         logDebug("Completed shutdown scheduleExcutorService.");
121     }
122
123
124     /**
125      * Cancel currently executing {@link BaseActionRunnable} if any.
126      * This method returns immediately if there is currently no {@link BaseActionRunnable} actively executing.
127      * @param rpcCausingAbort - The RPC causing the abort
128      * @param stateBeingAbborted - The current state being canceled
129      * @param timeout - The amount of time to wait for a cancel to complete
130      * @param timeUnit - The unit of time of timeout
131      * @throws TimeoutException - If {@link BaseActionRunnable} has not completely cancelled within the timeout period
132      * @throws InterruptedException - If the Thread waiting for the abort
133      */
134     public synchronized void cancelBaseActionRunnable(final AppcOam.RPC rpcCausingAbort,
135                                                       AppcOamStates stateBeingAbborted,
136                                                       long timeout, TimeUnit timeUnit)
137         throws TimeoutException,InterruptedException {
138
139         final MyFuture localBackgroundBaseRunnableFuture = backgroundBaseRunnableFuture;
140         final Consumer<AppcOam.RPC> localCancelCallBackForBaseRunnable = cancelCallBackForBaseRunnable;
141
142         if (localBackgroundBaseRunnableFuture == null || localBackgroundBaseRunnableFuture.isDone()) {
143           return;
144         }
145
146         if (localCancelCallBackForBaseRunnable != null) {
147             localCancelCallBackForBaseRunnable.accept(rpcCausingAbort);
148         }
149         localBackgroundBaseRunnableFuture.cancel(true);
150
151         long timeoutMillis = timeUnit.toMillis(timeout);
152         long expiryTime = System.currentTimeMillis() + timeoutMillis;
153         while (!(localBackgroundBaseRunnableFuture.isDone())) {
154             long sleepTime = expiryTime - System.currentTimeMillis();
155             if (sleepTime < 1) {
156                 break;
157             }
158             this.wait(sleepTime);
159         }
160
161         if (!localBackgroundBaseRunnableFuture.isDone()) {
162             throw new TimeoutException(String.format("Unable to abort %s in timely manner.",stateBeingAbborted));
163         }
164     }
165
166     /**
167      * Schedule a {@link BaseActionRunnable} to begin async execution.   This is the Parent  {@link Runnable} for the
168      * children that are submitted by {@link #submitBaseSubCallable(Callable)}
169      *
170      * The currently executing {@link BaseActionRunnable} must fully be terminated before the next can be scheduled.
171      * This means all Tasks' {@link MyFuture#isDone()} must equal true and all threads must return to their respective
172      * thread pools.
173      *
174      * @param runnable of the to be scheduled service.
175      * @param cancelCallBack to be invoked when
176      *        {@link #cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)} is invoked.
177      * @param initialDelayMillis the time to delay first execution
178      * @param delayMillis the delay between the termination of one
179      * execution and the commencement of the next
180      * @return The {@link BaseActionRunnable}'s {@link Future}
181      * @throws IllegalStateException if there is currently executing Task
182      */
183     public synchronized Future<?> scheduleBaseRunnable(final Runnable runnable,
184                                                        final Consumer<AppcOam.RPC> cancelCallBack,
185                                                        long initialDelayMillis,
186                                                        long delayMillis)
187         throws IllegalStateException {
188
189         if (backgroundBaseRunnableFuture != null && !backgroundBaseRunnableFuture.isDone()) {
190             throw new IllegalStateException("Unable to schedule background task when one is already running.  All task must fully terminated before another can be scheduled. ");
191         }
192
193         this.cancelCallBackForBaseRunnable = cancelCallBack;
194
195         backgroundBaseRunnableFuture = new MyFuture(runnable) {
196             /**
197              * augments the cancel operation to cancel all subTack too,
198              */
199             @Override
200             public boolean cancel(final boolean mayInterruptIfRunning) {
201                 boolean cancel;
202                 synchronized (AsyncTaskHelper.this) {
203                     cancel = super.cancel(mayInterruptIfRunning);
204                     myFutureSet.stream().filter(f->!this.equals(f)).forEach(f->f.cancel(mayInterruptIfRunning));
205                 }
206                 return cancel;
207             }
208
209             /**
210              * augments the isDone operation to return false until all subTask have completed too.
211              */
212             @Override
213             public boolean isDone() {
214                 synchronized (AsyncTaskHelper.this) {
215                     return myFutureSet.isEmpty();
216                 }
217             }
218         };
219         backgroundBaseRunnableFuture.setFuture(
220             scheduledExecutorService.scheduleWithFixedDelay(
221                 backgroundBaseRunnableFuture, initialDelayMillis, delayMillis, TimeUnit.MILLISECONDS)
222         );
223         return backgroundBaseRunnableFuture;
224     }
225
226     /**
227      * Submits children {@link Callable} to be executed as soon as possible,  A parent must have been scheduled
228      * previously via {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}
229      * @param callable the Callable to be submitted
230      * @return The {@link Callable}'s {@link Future}
231      */
232     synchronized Future<?> submitBaseSubCallable(final Callable callable) {
233
234         if (backgroundBaseRunnableFuture == null
235             || backgroundBaseRunnableFuture.isCancelled()
236             || backgroundBaseRunnableFuture.isDone()){
237             throw new IllegalStateException("Unable to schedule subCallable when a base Runnable is not running.");
238         }
239
240         //Make sure the pool is ready to go
241         if(bundleOperationService.getPoolSize() != bundleOperationService.getMaximumPoolSize()){
242             bundleOperationService.setCorePoolSize(bundleOperationService.getMaximumPoolSize());
243             bundleOperationService.prestartAllCoreThreads();
244             bundleOperationService.setCorePoolSize(0);
245         }
246
247         MyFuture<?> myFuture = new MyFuture(callable);
248         myFuture.setFuture(bundleOperationService.submit((Callable)myFuture));
249         return myFuture;
250     }
251
252     /**
253      * Genral debug log when debug logging level is enabled.
254      * @param message of the log message format
255      * @param args of the objects listed in the message format
256      */
257     private void logDebug(String message, Object... args) {
258         if (logger.isDebugEnabled()) {
259             logger.debug(String.format(message, args));
260         }
261     }
262
263     /**
264      * This class has two purposes.  First it insures  {@link #isDone()} only returns true if the deligate is not
265      * currently running and will not be running in the future: See Java BUG JDK-8073704 Second this class maintains
266      * the {@link #myFutureSet } by insurring that itself is removed when  {@link #isDone()} returns true.
267      *
268      * See {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} and {@link #submitBaseSubCallable(Callable)}
269      * for usage of this class
270      */
271     private class MyFuture<T> implements Future<T>, Runnable, Callable<T> {
272
273         private Future<T> future;
274         private final Runnable runnable;
275         private final Callable<T> callable;
276         private boolean isRunning;
277
278         MyFuture(Runnable runnable) {
279             this.runnable = runnable;
280             this.callable = null;
281             myFutureSet.add(this);
282         }
283
284         MyFuture(Callable<T> callable) {
285             this.runnable = null;
286             this.callable = callable;
287             myFutureSet.add(this);
288         }
289
290         void setFuture(Future<T> future) {
291             this.future = future;
292         }
293
294         @Override
295         public boolean cancel(boolean mayInterruptIfRunning) {
296             synchronized (AsyncTaskHelper.this) {
297                 if (!isRunning) {
298                     myFutureSetRemove();
299                 }
300
301                 return future.cancel(mayInterruptIfRunning);
302             }
303         }
304
305         @Override
306         public boolean isCancelled() {
307             synchronized (AsyncTaskHelper.this) {
308                 return future.isCancelled();
309             }
310         }
311
312         @Override
313         public boolean isDone() {
314             synchronized (AsyncTaskHelper.this) {
315                 return future.isDone() && !isRunning;
316             }
317         }
318
319         @Override
320         public T get() throws InterruptedException, ExecutionException {
321                 return future.get();
322         }
323
324         @Override
325         public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
326             return future.get(timeout, unit);
327         }
328
329         @Override
330         public void run() {
331             synchronized (AsyncTaskHelper.this) {
332                 if(future.isCancelled()){
333                     return;
334                 }
335                 isRunning = true;
336             }
337             try {
338                 runnable.run();
339             } finally {
340                 synchronized (AsyncTaskHelper.this) {
341                     isRunning = false;
342
343                     //The Base Runnable is expected to run again.
344                     //unless it has been canceled.
345                     //so only removed if it is canceled.
346                     if (future.isCancelled()) {
347                         myFutureSetRemove();
348                     }
349                 }
350             }
351         }
352
353         @Override
354         public T call() throws Exception {
355             synchronized (AsyncTaskHelper.this) {
356                 if(future.isCancelled()){
357                     throw new CancellationException();
358                 }
359                 isRunning = true;
360             }
361             try {
362                 return callable.call();
363             } finally {
364                 synchronized (AsyncTaskHelper.this){
365                     isRunning = false;
366                     myFutureSetRemove();
367                 }
368             }
369         }
370
371
372         /**
373          * Removes this from the the myFutureSet.
374          * When all the BaseActionRunnable is Done notify any thread waiting in
375          * {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}
376          */
377         void myFutureSetRemove(){
378             synchronized (AsyncTaskHelper.this) {
379                 myFutureSet.remove(this);
380                 if(myFutureSet.isEmpty()){
381                     backgroundBaseRunnableFuture = null;
382                     cancelCallBackForBaseRunnable = null;
383                     AsyncTaskHelper.this.notifyAll();
384
385                 }
386             }
387         }
388
389     }
390 }