serializing OAM async task
[appc.git] / appc-oam / appc-oam-bundle / src / main / java / org / openecomp / appc / oam / util / AsyncTaskHelper.java
index db60337..0a4b868 100644 (file)
@@ -27,34 +27,61 @@ package org.openecomp.appc.oam.util;
 import com.att.eelf.configuration.EELFLogger;
 import org.openecomp.appc.oam.AppcOam;
 import org.openecomp.appc.oam.processor.BaseActionRunnable;
+import org.openecomp.appc.statemachine.impl.readers.AppcOamStates;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.FrameworkUtil;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
 
 /**
- * Utility class provides general async task related help.
+ * The AsyncTaskHelper class manages an internal parent child data structure.   The parent is a transient singleton,
+ * meaning only one can exist at any given time.     The parent is scheduled with the
+ * {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} and is executed at configured interval.   It can be
+ * terminated by using the {@link Future#cancel(boolean)} or the {@link Future#cancel(boolean)} returned from \
+ * {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}.
+ * <p>
+ * The children are scheduled using {@link #submitBaseSubCallable(Callable)}} and can only be scheduled if a parent
+ * is scheduled.   Children only execute once, but can be terminated preemptively by the {@link Future#cancel(boolean)}
+ * returned from {@link #submitBaseSubCallable(Callable)} or indirectly by terminating the parent via the method
+ * described above.
+ * <p>
+ * This class augments the meaning of {@link Future#isDone()} in that it guarantees that this method only returns true
+ * if the scheduled {@link Runnable} or {@link Callable}  is not currently executing and is not going to execute in the
+ * future.   This is different than the Java core implementation of {@link Future#isDone()} in which it will return
+ * true immediately after the {@link Future#cancel(boolean)} is called. Even if a Thread is actively executing the
+ * {@link Runnable} or {@link Callable} and has not return yet. See Java BUG JDK-8073704
+ * <p>
+ * The parent {@link Future#isDone()} has an additional augmentation in that it will not return true until all of its
+ * children's {@link Future#isDone()} also return true.
+ *
  */
 @SuppressWarnings("unchecked")
 public class AsyncTaskHelper {
-    final int MMODE_TASK_DELAY = 10000;
-    final int COMMON_INITIAL_DELAY = 0;
-    final int COMMON_INTERVAL = 1000;
 
     private final EELFLogger logger;
     private final ScheduledExecutorService scheduledExecutorService;
     private final ThreadPoolExecutor bundleOperationService;
 
-    /** Reference to the Async task */
-    private volatile Future<?> backgroundOamTask;
-    /** Reference to the runnable of Async task */
-    private volatile BaseActionRunnable taskRunnable;
+    /** Reference to {@link MyFuture} return from {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} */
+    private MyFuture backgroundBaseRunnableFuture;
+
+    /** The cancel Callback from {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}   */
+    private Consumer<AppcOam.RPC> cancelCallBackForBaseRunnable;
+
+    /** All Futures created by thus calls which have not completed -- {@link Future#isDone()} equals false  */
+    private Set<MyFuture> myFutureSet = new HashSet<>();
 
     /**
      * Constructor
@@ -64,103 +91,162 @@ public class AsyncTaskHelper {
         logger = eelfLogger;
 
         scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
-                (runnable) -> {
-                    Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
-                    return new Thread(runnable, bundle.getSymbolicName() + " scheduledExecutor");
-                }
+            (runnable) -> {
+                Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
+                return new Thread(runnable, bundle.getSymbolicName() + " scheduledExecutor");
+            }
         );
 
         bundleOperationService = new ThreadPoolExecutor(
-                0,
-                10,
-                10,
-                TimeUnit.SECONDS,
-                new LinkedBlockingQueue(),// BlockingQueue<Runnable> workQueue
-                (runnable) -> new Thread(runnable, "OAM bundler operation executor")//ThreadFactory
+            0,
+            10,
+            10,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue(), //BlockingQueue<Runnable> workQueue
+            (runnable) -> {
+                Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
+                return new Thread(runnable, bundle.getSymbolicName() + " bundle operation executor");
+            }
         );
     }
 
-    void addThreadsToPool() {
-        bundleOperationService.setCorePoolSize(bundleOperationService.getMaximumPoolSize());
-    }
-
-    void removeThreadsFromPoolWhenDone() {
-        bundleOperationService.setCorePoolSize(0);
-    }
-
     /**
      * Terminate the class <bS>ScheduledExecutorService</b>
      */
     public void close() {
         logDebug("Start shutdown scheduleExcutorService.");
-        scheduledExecutorService.shutdown();
-        bundleOperationService.shutdown();
+        bundleOperationService.shutdownNow();
+        scheduledExecutorService.shutdownNow();
         logDebug("Completed shutdown scheduleExcutorService.");
     }
 
+
     /**
-     * Get current async task refernce
-     * @return the class <b>backgroundOamTask</b>
+     * Cancel currently executing {@link BaseActionRunnable} if any.
+     * This method returns immediately if there is currently no {@link BaseActionRunnable} actively executing.
+     * @param rpcCausingAbort - The RPC causing the abort
+     * @param stateBeingAbborted - The current state being canceled
+     * @param timeout - The amount of time to wait for a cancel to complete
+     * @param timeUnit - The unit of time of timeout
+     * @throws TimeoutException - If {@link BaseActionRunnable} has not completely cancelled within the timeout period
+     * @throws InterruptedException - If the Thread waiting for the abort
      */
-    public Future<?> getCurrentAsyncTask() {
-        return backgroundOamTask;
+    public synchronized void cancelBaseActionRunnable(final AppcOam.RPC rpcCausingAbort,
+                                                      AppcOamStates stateBeingAbborted,
+                                                      long timeout, TimeUnit timeUnit)
+        throws TimeoutException,InterruptedException {
+
+        final MyFuture localBackgroundBaseRunnableFuture = backgroundBaseRunnableFuture;
+        final Consumer<AppcOam.RPC> localCancelCallBackForBaseRunnable = cancelCallBackForBaseRunnable;
+
+        if (localBackgroundBaseRunnableFuture == null || localBackgroundBaseRunnableFuture.isDone()) {
+          return;
+        }
+
+        if (localCancelCallBackForBaseRunnable != null) {
+            localCancelCallBackForBaseRunnable.accept(rpcCausingAbort);
+        }
+        localBackgroundBaseRunnableFuture.cancel(true);
+
+        long timeoutMillis = timeUnit.toMillis(timeout);
+        long expiryTime = System.currentTimeMillis() + timeoutMillis;
+        while (!(localBackgroundBaseRunnableFuture.isDone())) {
+            long sleepTime = expiryTime - System.currentTimeMillis();
+            if (sleepTime < 1) {
+                break;
+            }
+            this.wait(sleepTime);
+        }
+
+        if (!localBackgroundBaseRunnableFuture.isDone()) {
+            throw new TimeoutException(String.format("Unable to abort %s in timely manner.",stateBeingAbborted));
+        }
     }
 
     /**
-     * Schedule a service for async task with the passed in parameters
-     * @param rpc of the REST API call, decides how to schedule the service
+     * Schedule a {@link BaseActionRunnable} to begin async execution.   This is the Parent  {@link Runnable} for the
+     * children that are submitted by {@link #submitBaseSubCallable(Callable)}
+     *
+     * The currently executing {@link BaseActionRunnable} must fully be terminated before the next can be scheduled.
+     * This means all Tasks' {@link MyFuture#isDone()} must equal true and all threads must return to their respective
+     * thread pools.
+     *
      * @param runnable of the to be scheduled service.
-     * @return the reference of the scheduled task
+     * @param cancelCallBack to be invoked when
+     *        {@link #cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)} is invoked.
+     * @param initialDelayMillis the time to delay first execution
+     * @param delayMillis the delay between the termination of one
+     * execution and the commencement of the next
+     * @return The {@link BaseActionRunnable}'s {@link Future}
+     * @throws IllegalStateException if there is currently executing Task
      */
-    public Future<?> scheduleAsyncTask(final AppcOam.RPC rpc, final BaseActionRunnable runnable) {
-        int initialDelay, interval;
-        switch (rpc) {
-            case maintenance_mode:
-                initialDelay = interval =MMODE_TASK_DELAY;
-                break;
-            case start:
-            case stop:
-            case restart:
-                initialDelay = COMMON_INITIAL_DELAY;
-                interval = COMMON_INTERVAL;
-                break;
-            default:
-                // should not get here. Log it and return null
-                logDebug(String.format("Cannot scheudle task for unsupported RPC(%s).", rpc.name()));
-                return null;
-        }
-
-        // Always cancel existing  async task
-        if (backgroundOamTask != null) {
-            logDebug("Cancelling background task in schedule task.");
-            backgroundOamTask.cancel(true);
-            if (taskRunnable != null) {
-                taskRunnable.abortRunnable(rpc);
-            }
+    public synchronized Future<?> scheduleBaseRunnable(final Runnable runnable,
+                                                       final Consumer<AppcOam.RPC> cancelCallBack,
+                                                       long initialDelayMillis,
+                                                       long delayMillis)
+        throws IllegalStateException {
+
+        if (backgroundBaseRunnableFuture != null && !backgroundBaseRunnableFuture.isDone()) {
+            throw new IllegalStateException("Unable to schedule background task when one is already running.  All task must fully terminated before another can be scheduled. ");
         }
 
-        taskRunnable = runnable;
-        backgroundOamTask = scheduledExecutorService.scheduleWithFixedDelay(
-                runnable, initialDelay, interval, TimeUnit.MILLISECONDS);
+        this.cancelCallBackForBaseRunnable = cancelCallBack;
 
-        return backgroundOamTask;
-    }
+        backgroundBaseRunnableFuture = new MyFuture(runnable) {
+            /**
+             * augments the cancel operation to cancel all subTack too,
+             */
+            @Override
+            public boolean cancel(final boolean mayInterruptIfRunning) {
+                boolean cancel;
+                synchronized (AsyncTaskHelper.this) {
+                    cancel = super.cancel(mayInterruptIfRunning);
+                    myFutureSet.stream().filter(f->!this.equals(f)).forEach(f->f.cancel(mayInterruptIfRunning));
+                }
+                return cancel;
+            }
 
-    Future<?> submitBundleLcOperation(final Callable callable) {
-        return bundleOperationService.submit(callable);
+            /**
+             * augments the isDone operation to return false until all subTask have completed too.
+             */
+            @Override
+            public boolean isDone() {
+                synchronized (AsyncTaskHelper.this) {
+                    return myFutureSet.isEmpty();
+                }
+            }
+        };
+        backgroundBaseRunnableFuture.setFuture(
+            scheduledExecutorService.scheduleWithFixedDelay(
+                backgroundBaseRunnableFuture, initialDelayMillis, delayMillis, TimeUnit.MILLISECONDS)
+        );
+        return backgroundBaseRunnableFuture;
     }
 
     /**
-     * Cancle a previously schedule task. If the task is the same as backgroundOamTask, set it to null.
-     * @param task to be canceled
+     * Submits children {@link Callable} to be executed as soon as possible,  A parent must have been scheduled
+     * previously via {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}
+     * @param callable the Callable to be submitted
+     * @return The {@link Callable}'s {@link Future}
      */
-    public void cancelAsyncTask(Future<?> task) {
-        task.cancel(false);
-        if (task == backgroundOamTask) {
-            backgroundOamTask = null;
-            taskRunnable = null;
-            logDebug("Cancelling background task in cancel task.");
+    synchronized Future<?> submitBaseSubCallable(final Callable callable) {
+
+        if (backgroundBaseRunnableFuture == null
+            || backgroundBaseRunnableFuture.isCancelled()
+            || backgroundBaseRunnableFuture.isDone()){
+            throw new IllegalStateException("Unable to schedule subCallable when a base Runnable is not running.");
+        }
+
+        //Make sure the pool is ready to go
+        if(bundleOperationService.getPoolSize() != bundleOperationService.getMaximumPoolSize()){
+            bundleOperationService.setCorePoolSize(bundleOperationService.getMaximumPoolSize());
+            bundleOperationService.prestartAllCoreThreads();
+            bundleOperationService.setCorePoolSize(0);
         }
+
+        MyFuture<?> myFuture = new MyFuture(callable);
+        myFuture.setFuture(bundleOperationService.submit((Callable)myFuture));
+        return myFuture;
     }
 
     /**
@@ -173,4 +259,132 @@ public class AsyncTaskHelper {
             logger.debug(String.format(message, args));
         }
     }
+
+    /**
+     * This class has two purposes.  First it insures  {@link #isDone()} only returns true if the deligate is not
+     * currently running and will not be running in the future: See Java BUG JDK-8073704 Second this class maintains
+     * the {@link #myFutureSet } by insurring that itself is removed when  {@link #isDone()} returns true.
+     *
+     * See {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} and {@link #submitBaseSubCallable(Callable)}
+     * for usage of this class
+     */
+    private class MyFuture<T> implements Future<T>, Runnable, Callable<T> {
+
+        private Future<T> future;
+        private final Runnable runnable;
+        private final Callable<T> callable;
+        private boolean isRunning;
+
+        MyFuture(Runnable runnable) {
+            this.runnable = runnable;
+            this.callable = null;
+            myFutureSet.add(this);
+        }
+
+        MyFuture(Callable<T> callable) {
+            this.runnable = null;
+            this.callable = callable;
+            myFutureSet.add(this);
+        }
+
+        void setFuture(Future<T> future) {
+            this.future = future;
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            synchronized (AsyncTaskHelper.this) {
+                if (!isRunning) {
+                    myFutureSetRemove();
+                }
+
+                return future.cancel(mayInterruptIfRunning);
+            }
+        }
+
+        @Override
+        public boolean isCancelled() {
+            synchronized (AsyncTaskHelper.this) {
+                return future.isCancelled();
+            }
+        }
+
+        @Override
+        public boolean isDone() {
+            synchronized (AsyncTaskHelper.this) {
+                return future.isDone() && !isRunning;
+            }
+        }
+
+        @Override
+        public T get() throws InterruptedException, ExecutionException {
+                return future.get();
+        }
+
+        @Override
+        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            return future.get(timeout, unit);
+        }
+
+        @Override
+        public void run() {
+            synchronized (AsyncTaskHelper.this) {
+                if(future.isCancelled()){
+                    return;
+                }
+                isRunning = true;
+            }
+            try {
+                runnable.run();
+            } finally {
+                synchronized (AsyncTaskHelper.this) {
+                    isRunning = false;
+
+                    //The Base Runnable is expected to run again.
+                    //unless it has been canceled.
+                    //so only removed if it is canceled.
+                    if (future.isCancelled()) {
+                        myFutureSetRemove();
+                    }
+                }
+            }
+        }
+
+        @Override
+        public T call() throws Exception {
+            synchronized (AsyncTaskHelper.this) {
+                if(future.isCancelled()){
+                    throw new CancellationException();
+                }
+                isRunning = true;
+            }
+            try {
+                return callable.call();
+            } finally {
+                synchronized (AsyncTaskHelper.this){
+                    isRunning = false;
+                    myFutureSetRemove();
+                }
+            }
+        }
+
+
+        /**
+         * Removes this from the the myFutureSet.
+         * When all the BaseActionRunnable is Done notify any thread waiting in
+         * {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}
+         */
+        void myFutureSetRemove(){
+            synchronized (AsyncTaskHelper.this) {
+                myFutureSet.remove(this);
+                if(myFutureSet.isEmpty()){
+                    backgroundBaseRunnableFuture = null;
+                    cancelCallBackForBaseRunnable = null;
+                    AsyncTaskHelper.this.notifyAll();
+
+                }
+            }
+        }
+
+    }
 }