2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2017 Amdocs
8 * =============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 * ============LICENSE_END=========================================================
25 package org.onap.appc.oam.util;
27 import com.att.eelf.configuration.EELFLogger;
28 import org.onap.appc.oam.AppcOam;
29 import org.onap.appc.oam.processor.BaseActionRunnable;
30 import org.onap.appc.statemachine.impl.readers.AppcOamStates;
31 import org.osgi.framework.Bundle;
32 import org.osgi.framework.FrameworkUtil;
34 import java.util.HashSet;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.CancellationException;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.Future;
41 import java.util.concurrent.LinkedBlockingQueue;
42 import java.util.concurrent.ScheduledExecutorService;
43 import java.util.concurrent.ThreadPoolExecutor;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.TimeoutException;
46 import java.util.function.Consumer;
49 * The AsyncTaskHelper class manages an internal parent child data structure. The parent is a transient singleton,
50 * meaning only one can exist at any given time. The parent is scheduled with the
51 * {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} and is executed at configured interval. It can be
52 * terminated by using the {@link Future#cancel(boolean)} or the {@link Future#cancel(boolean)} returned from \
53 * {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}.
55 * The children are scheduled using {@link #submitBaseSubCallable(Callable)}} and can only be scheduled if a parent
56 * is scheduled. Children only execute once, but can be terminated preemptively by the {@link Future#cancel(boolean)}
57 * returned from {@link #submitBaseSubCallable(Callable)} or indirectly by terminating the parent via the method
60 * This class augments the meaning of {@link Future#isDone()} in that it guarantees that this method only returns true
61 * if the scheduled {@link Runnable} or {@link Callable} is not currently executing and is not going to execute in the
62 * future. This is different than the Java core implementation of {@link Future#isDone()} in which it will return
63 * true immediately after the {@link Future#cancel(boolean)} is called. Even if a Thread is actively executing the
64 * {@link Runnable} or {@link Callable} and has not return yet. See Java BUG JDK-8073704
66 * The parent {@link Future#isDone()} has an additional augmentation in that it will not return true until all of its
67 * children's {@link Future#isDone()} also return true.
70 @SuppressWarnings("unchecked")
71 public class AsyncTaskHelper {
73 private final EELFLogger logger;
74 private final ScheduledExecutorService scheduledExecutorService;
75 private final ThreadPoolExecutor bundleOperationService;
77 /** Reference to {@link MyFuture} return from {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} */
78 private MyFuture backgroundBaseRunnableFuture;
80 /** The cancel Callback from {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} */
81 private Consumer<AppcOam.RPC> cancelCallBackForBaseRunnable;
83 /** All Futures created by thus calls which have not completed -- {@link Future#isDone()} equals false */
84 private Set<MyFuture> myFutureSet = new HashSet<>();
88 * @param eelfLogger of the logger
90 public AsyncTaskHelper(EELFLogger eelfLogger) {
93 scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
95 Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
96 return new Thread(runnable, bundle.getSymbolicName() + " scheduledExecutor");
100 bundleOperationService = new ThreadPoolExecutor(
105 new LinkedBlockingQueue(), //BlockingQueue<Runnable> workQueue
107 Bundle bundle = FrameworkUtil.getBundle(AppcOam.class);
108 return new Thread(runnable, bundle.getSymbolicName() + " bundle operation executor");
114 * Terminate the class <bS>ScheduledExecutorService</b>
116 public void close() {
117 logDebug("Start shutdown scheduleExcutorService.");
118 bundleOperationService.shutdownNow();
119 scheduledExecutorService.shutdownNow();
120 logDebug("Completed shutdown scheduleExcutorService.");
125 * Cancel currently executing {@link BaseActionRunnable} if any.
126 * This method returns immediately if there is currently no {@link BaseActionRunnable} actively executing.
127 * @param rpcCausingAbort - The RPC causing the abort
128 * @param stateBeingAbborted - The current state being canceled
129 * @param timeout - The amount of time to wait for a cancel to complete
130 * @param timeUnit - The unit of time of timeout
131 * @throws TimeoutException - If {@link BaseActionRunnable} has not completely cancelled within the timeout period
132 * @throws InterruptedException - If the Thread waiting for the abort
134 public synchronized void cancelBaseActionRunnable(final AppcOam.RPC rpcCausingAbort,
135 AppcOamStates stateBeingAbborted,
136 long timeout, TimeUnit timeUnit)
137 throws TimeoutException,InterruptedException {
139 final MyFuture localBackgroundBaseRunnableFuture = backgroundBaseRunnableFuture;
140 final Consumer<AppcOam.RPC> localCancelCallBackForBaseRunnable = cancelCallBackForBaseRunnable;
142 if (localBackgroundBaseRunnableFuture == null || localBackgroundBaseRunnableFuture.isDone()) {
146 if (localCancelCallBackForBaseRunnable != null) {
147 localCancelCallBackForBaseRunnable.accept(rpcCausingAbort);
149 localBackgroundBaseRunnableFuture.cancel(true);
151 long timeoutMillis = timeUnit.toMillis(timeout);
152 long expiryTime = System.currentTimeMillis() + timeoutMillis;
153 while (!(localBackgroundBaseRunnableFuture.isDone())) {
154 long sleepTime = expiryTime - System.currentTimeMillis();
158 this.wait(sleepTime);
161 if (!localBackgroundBaseRunnableFuture.isDone()) {
162 throw new TimeoutException(String.format("Unable to abort %s in timely manner.",stateBeingAbborted));
167 * Schedule a {@link BaseActionRunnable} to begin async execution. This is the Parent {@link Runnable} for the
168 * children that are submitted by {@link #submitBaseSubCallable(Callable)}
170 * The currently executing {@link BaseActionRunnable} must fully be terminated before the next can be scheduled.
171 * This means all Tasks' {@link MyFuture#isDone()} must equal true and all threads must return to their respective
174 * @param runnable of the to be scheduled service.
175 * @param cancelCallBack to be invoked when
176 * {@link #cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)} is invoked.
177 * @param initialDelayMillis the time to delay first execution
178 * @param delayMillis the delay between the termination of one
179 * execution and the commencement of the next
180 * @return The {@link BaseActionRunnable}'s {@link Future}
181 * @throws IllegalStateException if there is currently executing Task
183 public synchronized Future<?> scheduleBaseRunnable(final Runnable runnable,
184 final Consumer<AppcOam.RPC> cancelCallBack,
185 long initialDelayMillis,
187 throws IllegalStateException {
189 if (backgroundBaseRunnableFuture != null && !backgroundBaseRunnableFuture.isDone()) {
190 throw new IllegalStateException("Unable to schedule background task when one is already running. All task must fully terminated before another can be scheduled. ");
193 this.cancelCallBackForBaseRunnable = cancelCallBack;
195 backgroundBaseRunnableFuture = new MyFuture(runnable) {
197 * augments the cancel operation to cancel all subTack too,
200 public boolean cancel(final boolean mayInterruptIfRunning) {
202 synchronized (AsyncTaskHelper.this) {
203 cancel = super.cancel(mayInterruptIfRunning);
204 //clone the set to prevent java.util.ConcurrentModificationException. The synchronized prevents
205 //other threads from modifying this set, but not itself. The f->f.cancel may modify myFutureSet by
206 //removing an entry which breaks the iteration in the forEach.
207 (new HashSet<MyFuture>(myFutureSet))
208 .stream().filter(f->!this.equals(f)).forEach(f->f.cancel(mayInterruptIfRunning));
214 * augments the isDone operation to return false until all subTask have completed too.
217 public boolean isDone() {
218 synchronized (AsyncTaskHelper.this) {
219 return myFutureSet.isEmpty();
223 backgroundBaseRunnableFuture.setFuture(
224 scheduledExecutorService.scheduleWithFixedDelay(
225 backgroundBaseRunnableFuture, initialDelayMillis, delayMillis, TimeUnit.MILLISECONDS)
227 return backgroundBaseRunnableFuture;
231 * Submits children {@link Callable} to be executed as soon as possible, A parent must have been scheduled
232 * previously via {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)}
233 * @param callable the Callable to be submitted
234 * @return The {@link Callable}'s {@link Future}
236 synchronized Future<?> submitBaseSubCallable(final Callable callable) {
238 if (backgroundBaseRunnableFuture == null
239 || backgroundBaseRunnableFuture.isCancelled()
240 || backgroundBaseRunnableFuture.isDone()){
241 throw new IllegalStateException("Unable to schedule subCallable when a base Runnable is not running.");
244 //Make sure the pool is ready to go
245 if(bundleOperationService.getPoolSize() != bundleOperationService.getMaximumPoolSize()){
246 bundleOperationService.setCorePoolSize(bundleOperationService.getMaximumPoolSize());
247 bundleOperationService.prestartAllCoreThreads();
248 bundleOperationService.setCorePoolSize(0);
251 MyFuture<?> myFuture = new MyFuture(callable);
252 myFuture.setFuture(bundleOperationService.submit((Callable)myFuture));
257 * Genral debug log when debug logging level is enabled.
258 * @param message of the log message format
259 * @param args of the objects listed in the message format
261 private void logDebug(String message, Object... args) {
262 if (logger.isDebugEnabled()) {
263 logger.debug(String.format(message, args));
268 * This class has two purposes. First it insures {@link #isDone()} only returns true if the deligate is not
269 * currently running and will not be running in the future: See Java BUG JDK-8073704 Second this class maintains
270 * the {@link #myFutureSet } by insurring that itself is removed when {@link #isDone()} returns true.
272 * See {@link #scheduleBaseRunnable(Runnable, Consumer, long, long)} and {@link #submitBaseSubCallable(Callable)}
273 * for usage of this class
275 private class MyFuture<T> implements Future<T>, Runnable, Callable<T> {
277 private Future<T> future;
278 private final Runnable runnable;
279 private final Callable<T> callable;
280 private boolean isRunning;
282 MyFuture(Runnable runnable) {
283 this.runnable = runnable;
284 this.callable = null;
285 myFutureSet.add(this);
288 MyFuture(Callable<T> callable) {
289 this.runnable = null;
290 this.callable = callable;
291 myFutureSet.add(this);
294 void setFuture(Future<T> future) {
295 this.future = future;
299 public boolean cancel(boolean mayInterruptIfRunning) {
300 synchronized (AsyncTaskHelper.this) {
305 return future.cancel(mayInterruptIfRunning);
310 public boolean isCancelled() {
311 synchronized (AsyncTaskHelper.this) {
312 return future.isCancelled();
317 public boolean isDone() {
318 synchronized (AsyncTaskHelper.this) {
319 return future.isDone() && !isRunning;
324 public T get() throws InterruptedException, ExecutionException {
329 public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
330 return future.get(timeout, unit);
335 synchronized (AsyncTaskHelper.this) {
336 if(future.isCancelled()){
344 synchronized (AsyncTaskHelper.this) {
347 //The Base Runnable is expected to run again.
348 //unless it has been canceled.
349 //so only removed if it is canceled.
350 if (future.isCancelled()) {
358 public T call() throws Exception {
359 synchronized (AsyncTaskHelper.this) {
360 if(future.isCancelled()){
361 throw new CancellationException();
366 return callable.call();
368 synchronized (AsyncTaskHelper.this){
377 * Removes this from the the myFutureSet.
378 * When all the BaseActionRunnable is Done notify any thread waiting in
379 * {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}
381 void myFutureSetRemove(){
382 synchronized (AsyncTaskHelper.this) {
383 myFutureSet.remove(this);
384 if(myFutureSet.isEmpty()){
385 backgroundBaseRunnableFuture = null;
386 cancelCallBackForBaseRunnable = null;
387 AsyncTaskHelper.this.notifyAll();