import com.att.eelf.configuration.EELFLogger;
import org.openecomp.appc.oam.AppcOam;
import org.openecomp.appc.oam.processor.BaseActionRunnable;
+import org.openecomp.appc.statemachine.impl.readers.AppcOamStates;
import org.osgi.framework.Bundle;
import org.osgi.framework.FrameworkUtil;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
/**
- * Utility class provides general async task related help.
+ * The AsyncTaskHelper class manages an internal parent child data structure. The parent is a transient singleton,
+ * meaning only one can exist at any given time. The parent is scheduled with the
+ * {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} and is executed at configured interval. It can be
+ * terminated by using the {@link Future#cancel(boolean)} or the {@link Future#cancel(boolean)} returned from \
+ * {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}.
+ * <p>
+ * The children are scheduled using {@link #submitBaseSubCallable(Callable)}} and can only be scheduled if a parent
+ * is scheduled. Children only execute once, but can be terminated preemptively by the {@link Future#cancel(boolean)}
+ * returned from {@link #submitBaseSubCallable(Callable)} or indirectly by terminating the parent via the method
+ * described above.
+ * <p>
+ * This class augments the meaning of {@link Future#isDone()} in that it guarantees that this method only returns true
+ * if the scheduled {@link Runnable} or {@link Callable} is not currently executing and is not going to execute in the
+ * future. This is different than the Java core implementation of {@link Future#isDone()} in which it will return
+ * true immediately after the {@link Future#cancel(boolean)} is called. Even if a Thread is actively executing the
+ * {@link Runnable} or {@link Callable} and has not return yet. See Java BUG JDK-8073704
+ * <p>
+ * The parent {@link Future#isDone()} has an additional augmentation in that it will not return true until all of its
+ * children's {@link Future#isDone()} also return true.
+ *
*/
@SuppressWarnings("unchecked")
public class AsyncTaskHelper {
- final int MMODE_TASK_DELAY = 10000;
- final int COMMON_INITIAL_DELAY = 0;
- final int COMMON_INTERVAL = 1000;
private final EELFLogger logger;
private final ScheduledExecutorService scheduledExecutorService;
private final ThreadPoolExecutor bundleOperationService;
- /** Reference to the Async task */
- private volatile Future<?> backgroundOamTask;
- /** Reference to the runnable of Async task */
- private volatile BaseActionRunnable taskRunnable;
+ /** Reference to {@link MyFuture} return from {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} */
+ private MyFuture backgroundBaseRunnableFuture;
+
+ /** The cancel Callback from {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} */
+ private Consumer<AppcOam.RPC> cancelCallBackForBaseRunnable;
+
+ /** All Futures created by thus calls which have not completed -- {@link Future#isDone()} equals false */
+ private Set<MyFuture> myFutureSet = new HashSet<>();
/**
* Constructor
logger = eelfLogger;
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
- (runnable) -> {
- Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
- return new Thread(runnable, bundle.getSymbolicName() + " scheduledExecutor");
- }
+ (runnable) -> {
+ Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
+ return new Thread(runnable, bundle.getSymbolicName() + " scheduledExecutor");
+ }
);
bundleOperationService = new ThreadPoolExecutor(
- 0,
- 10,
- 10,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue(),// BlockingQueue<Runnable> workQueue
- (runnable) -> new Thread(runnable, "OAM bundler operation executor")//ThreadFactory
+ 0,
+ 10,
+ 10,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue(), //BlockingQueue<Runnable> workQueue
+ (runnable) -> {
+ Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
+ return new Thread(runnable, bundle.getSymbolicName() + " bundle operation executor");
+ }
);
}
- void addThreadsToPool() {
- bundleOperationService.setCorePoolSize(bundleOperationService.getMaximumPoolSize());
- }
-
- void removeThreadsFromPoolWhenDone() {
- bundleOperationService.setCorePoolSize(0);
- }
-
/**
* Terminate the class <bS>ScheduledExecutorService</b>
*/
public void close() {
logDebug("Start shutdown scheduleExcutorService.");
- scheduledExecutorService.shutdown();
- bundleOperationService.shutdown();
+ bundleOperationService.shutdownNow();
+ scheduledExecutorService.shutdownNow();
logDebug("Completed shutdown scheduleExcutorService.");
}
+
/**
- * Get current async task refernce
- * @return the class <b>backgroundOamTask</b>
+ * Cancel currently executing {@link BaseActionRunnable} if any.
+ * This method returns immediately if there is currently no {@link BaseActionRunnable} actively executing.
+ * @param rpcCausingAbort - The RPC causing the abort
+ * @param stateBeingAbborted - The current state being canceled
+ * @param timeout - The amount of time to wait for a cancel to complete
+ * @param timeUnit - The unit of time of timeout
+ * @throws TimeoutException - If {@link BaseActionRunnable} has not completely cancelled within the timeout period
+ * @throws InterruptedException - If the Thread waiting for the abort
*/
- public Future<?> getCurrentAsyncTask() {
- return backgroundOamTask;
+ public synchronized void cancelBaseActionRunnable(final AppcOam.RPC rpcCausingAbort,
+ AppcOamStates stateBeingAbborted,
+ long timeout, TimeUnit timeUnit)
+ throws TimeoutException,InterruptedException {
+
+ final MyFuture localBackgroundBaseRunnableFuture = backgroundBaseRunnableFuture;
+ final Consumer<AppcOam.RPC> localCancelCallBackForBaseRunnable = cancelCallBackForBaseRunnable;
+
+ if (localBackgroundBaseRunnableFuture == null || localBackgroundBaseRunnableFuture.isDone()) {
+ return;
+ }
+
+ if (localCancelCallBackForBaseRunnable != null) {
+ localCancelCallBackForBaseRunnable.accept(rpcCausingAbort);
+ }
+ localBackgroundBaseRunnableFuture.cancel(true);
+
+ long timeoutMillis = timeUnit.toMillis(timeout);
+ long expiryTime = System.currentTimeMillis() + timeoutMillis;
+ while (!(localBackgroundBaseRunnableFuture.isDone())) {
+ long sleepTime = expiryTime - System.currentTimeMillis();
+ if (sleepTime < 1) {
+ break;
+ }
+ this.wait(sleepTime);
+ }
+
+ if (!localBackgroundBaseRunnableFuture.isDone()) {
+ throw new TimeoutException(String.format("Unable to abort %s in timely manner.",stateBeingAbborted));
+ }
}
/**
- * Schedule a service for async task with the passed in parameters
- * @param rpc of the REST API call, decides how to schedule the service
+ * Schedule a {@link BaseActionRunnable} to begin async execution. This is the Parent {@link Runnable} for the
+ * children that are submitted by {@link #submitBaseSubCallable(Callable)}
+ *
+ * The currently executing {@link BaseActionRunnable} must fully be terminated before the next can be scheduled.
+ * This means all Tasks' {@link MyFuture#isDone()} must equal true and all threads must return to their respective
+ * thread pools.
+ *
* @param runnable of the to be scheduled service.
- * @return the reference of the scheduled task
+ * @param cancelCallBack to be invoked when
+ * {@link #cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)} is invoked.
+ * @param initialDelayMillis the time to delay first execution
+ * @param delayMillis the delay between the termination of one
+ * execution and the commencement of the next
+ * @return The {@link BaseActionRunnable}'s {@link Future}
+ * @throws IllegalStateException if there is currently executing Task
*/
- public Future<?> scheduleAsyncTask(final AppcOam.RPC rpc, final BaseActionRunnable runnable) {
- int initialDelay, interval;
- switch (rpc) {
- case maintenance_mode:
- initialDelay = interval =MMODE_TASK_DELAY;
- break;
- case start:
- case stop:
- case restart:
- initialDelay = COMMON_INITIAL_DELAY;
- interval = COMMON_INTERVAL;
- break;
- default:
- // should not get here. Log it and return null
- logDebug(String.format("Cannot scheudle task for unsupported RPC(%s).", rpc.name()));
- return null;
- }
-
- // Always cancel existing async task
- if (backgroundOamTask != null) {
- logDebug("Cancelling background task in schedule task.");
- backgroundOamTask.cancel(true);
- if (taskRunnable != null) {
- taskRunnable.abortRunnable(rpc);
- }
+ public synchronized Future<?> scheduleBaseRunnable(final Runnable runnable,
+ final Consumer<AppcOam.RPC> cancelCallBack,
+ long initialDelayMillis,
+ long delayMillis)
+ throws IllegalStateException {
+
+ if (backgroundBaseRunnableFuture != null && !backgroundBaseRunnableFuture.isDone()) {
+ throw new IllegalStateException("Unable to schedule background task when one is already running. All task must fully terminated before another can be scheduled. ");
}
- taskRunnable = runnable;
- backgroundOamTask = scheduledExecutorService.scheduleWithFixedDelay(
- runnable, initialDelay, interval, TimeUnit.MILLISECONDS);
+ this.cancelCallBackForBaseRunnable = cancelCallBack;
- return backgroundOamTask;
- }
+ backgroundBaseRunnableFuture = new MyFuture(runnable) {
+ /**
+ * augments the cancel operation to cancel all subTack too,
+ */
+ @Override
+ public boolean cancel(final boolean mayInterruptIfRunning) {
+ boolean cancel;
+ synchronized (AsyncTaskHelper.this) {
+ cancel = super.cancel(mayInterruptIfRunning);
+ myFutureSet.stream().filter(f->!this.equals(f)).forEach(f->f.cancel(mayInterruptIfRunning));
+ }
+ return cancel;
+ }
- Future<?> submitBundleLcOperation(final Callable callable) {
- return bundleOperationService.submit(callable);
+ /**
+ * augments the isDone operation to return false until all subTask have completed too.
+ */
+ @Override
+ public boolean isDone() {
+ synchronized (AsyncTaskHelper.this) {
+ return myFutureSet.isEmpty();
+ }
+ }
+ };
+ backgroundBaseRunnableFuture.setFuture(
+ scheduledExecutorService.scheduleWithFixedDelay(
+ backgroundBaseRunnableFuture, initialDelayMillis, delayMillis, TimeUnit.MILLISECONDS)
+ );
+ return backgroundBaseRunnableFuture;
}
/**
- * Cancle a previously schedule task. If the task is the same as backgroundOamTask, set it to null.
- * @param task to be canceled
+ * Submits children {@link Callable} to be executed as soon as possible, A parent must have been scheduled
+ * previously via {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}
+ * @param callable the Callable to be submitted
+ * @return The {@link Callable}'s {@link Future}
*/
- public void cancelAsyncTask(Future<?> task) {
- task.cancel(false);
- if (task == backgroundOamTask) {
- backgroundOamTask = null;
- taskRunnable = null;
- logDebug("Cancelling background task in cancel task.");
+ synchronized Future<?> submitBaseSubCallable(final Callable callable) {
+
+ if (backgroundBaseRunnableFuture == null
+ || backgroundBaseRunnableFuture.isCancelled()
+ || backgroundBaseRunnableFuture.isDone()){
+ throw new IllegalStateException("Unable to schedule subCallable when a base Runnable is not running.");
+ }
+
+ //Make sure the pool is ready to go
+ if(bundleOperationService.getPoolSize() != bundleOperationService.getMaximumPoolSize()){
+ bundleOperationService.setCorePoolSize(bundleOperationService.getMaximumPoolSize());
+ bundleOperationService.prestartAllCoreThreads();
+ bundleOperationService.setCorePoolSize(0);
}
+
+ MyFuture<?> myFuture = new MyFuture(callable);
+ myFuture.setFuture(bundleOperationService.submit((Callable)myFuture));
+ return myFuture;
}
/**
logger.debug(String.format(message, args));
}
}
+
+ /**
+ * This class has two purposes. First it insures {@link #isDone()} only returns true if the deligate is not
+ * currently running and will not be running in the future: See Java BUG JDK-8073704 Second this class maintains
+ * the {@link #myFutureSet } by insurring that itself is removed when {@link #isDone()} returns true.
+ *
+ * See {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} and {@link #submitBaseSubCallable(Callable)}
+ * for usage of this class
+ */
+ private class MyFuture<T> implements Future<T>, Runnable, Callable<T> {
+
+ private Future<T> future;
+ private final Runnable runnable;
+ private final Callable<T> callable;
+ private boolean isRunning;
+
+ MyFuture(Runnable runnable) {
+ this.runnable = runnable;
+ this.callable = null;
+ myFutureSet.add(this);
+ }
+
+ MyFuture(Callable<T> callable) {
+ this.runnable = null;
+ this.callable = callable;
+ myFutureSet.add(this);
+ }
+
+ void setFuture(Future<T> future) {
+ this.future = future;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ synchronized (AsyncTaskHelper.this) {
+ if (!isRunning) {
+ myFutureSetRemove();
+ }
+
+ return future.cancel(mayInterruptIfRunning);
+ }
+ }
+
+ @Override
+ public boolean isCancelled() {
+ synchronized (AsyncTaskHelper.this) {
+ return future.isCancelled();
+ }
+ }
+
+ @Override
+ public boolean isDone() {
+ synchronized (AsyncTaskHelper.this) {
+ return future.isDone() && !isRunning;
+ }
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ return future.get();
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return future.get(timeout, unit);
+ }
+
+ @Override
+ public void run() {
+ synchronized (AsyncTaskHelper.this) {
+ if(future.isCancelled()){
+ return;
+ }
+ isRunning = true;
+ }
+ try {
+ runnable.run();
+ } finally {
+ synchronized (AsyncTaskHelper.this) {
+ isRunning = false;
+
+ //The Base Runnable is expected to run again.
+ //unless it has been canceled.
+ //so only removed if it is canceled.
+ if (future.isCancelled()) {
+ myFutureSetRemove();
+ }
+ }
+ }
+ }
+
+ @Override
+ public T call() throws Exception {
+ synchronized (AsyncTaskHelper.this) {
+ if(future.isCancelled()){
+ throw new CancellationException();
+ }
+ isRunning = true;
+ }
+ try {
+ return callable.call();
+ } finally {
+ synchronized (AsyncTaskHelper.this){
+ isRunning = false;
+ myFutureSetRemove();
+ }
+ }
+ }
+
+
+ /**
+ * Removes this from the the myFutureSet.
+ * When all the BaseActionRunnable is Done notify any thread waiting in
+ * {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}
+ */
+ void myFutureSetRemove(){
+ synchronized (AsyncTaskHelper.this) {
+ myFutureSet.remove(this);
+ if(myFutureSet.isEmpty()){
+ backgroundBaseRunnableFuture = null;
+ cancelCallBackForBaseRunnable = null;
+ AsyncTaskHelper.this.notifyAll();
+
+ }
+ }
+ }
+
+ }
}