2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2017 Amdocs
8 * =============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 * ============LICENSE_END=========================================================
25 package org.onap.appc.oam.util;
27 import com.att.eelf.configuration.EELFLogger;
28 import org.junit.After;
29 import org.junit.Assert;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.junit.runner.RunWith;
33 import org.mockito.Mockito;
34 import org.onap.appc.oam.AppcOam;
35 import org.onap.appc.statemachine.impl.readers.AppcOamStates;
36 import org.osgi.framework.Bundle;
37 import org.osgi.framework.FrameworkUtil;
38 import org.powermock.api.mockito.PowerMockito;
39 import org.powermock.core.classloader.annotations.PrepareForTest;
40 import org.powermock.modules.junit4.PowerMockRunner;
42 import java.util.LinkedList;
43 import java.util.concurrent.Future;
44 import java.util.concurrent.Semaphore;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.TimeoutException;
47 import java.util.concurrent.atomic.AtomicReference;
48 import java.util.function.Supplier;
50 import static org.mockito.Matchers.any;
51 import static org.mockito.Mockito.mock;
52 import static org.powermock.api.mockito.PowerMockito.mockStatic;
55 @RunWith(PowerMockRunner.class)
56 @PrepareForTest({FrameworkUtil.class})
57 public class AsyncTaskHelperTest {
58 private AsyncTaskHelper asyncTaskHelper;
60 private long initialDelayMillis = 0;
61 private long delayMillis = 10;
65 public void setUp() throws Exception {
67 // to avoid operation on logger fail, mock up the logger
68 EELFLogger mockLogger = mock(EELFLogger.class);
71 mockStatic(FrameworkUtil.class);
72 Bundle myBundle = mock(Bundle.class);
73 Mockito.doReturn("TestBundle").when(myBundle).getSymbolicName();
74 PowerMockito.when(FrameworkUtil.getBundle(any())).thenReturn(myBundle);
76 asyncTaskHelper = new AsyncTaskHelper(mockLogger);
83 public void shutdown(){
84 asyncTaskHelper.close();
89 * Test that Base Runnable
92 * Only one Base Runnable can be scheduled at time;
93 * Future.cancle stops the Base Runnable;
94 * That another Base Runnable can be scheduled once the previous isDone.
97 public void test_scheduleBaseRunnable_Base_isDone() throws Exception{
101 //loop is to test we can run consecutive Base Runnable
102 for(int testIteration = 0; testIteration < 3;testIteration++){
103 final ExecuteTest et = new ExecuteTest();
105 Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
112 //make sure it is running at a fix rate
113 Assert.assertTrue("It should be iterating", et.waitForTestExec(5000));
114 Assert.assertFalse("It Should not be Done", future.isDone());
115 Assert.assertTrue("It should be iterating", et.waitForTestExec(5000));
116 Assert.assertFalse("It Should not be Done", future.isDone());
119 //make sure a seconds Runnable cannot be scheduled when one is already running
121 asyncTaskHelper.scheduleBaseRunnable(et::test
126 Assert.fail("scheduling should have been prevented. ");
127 } catch (IllegalStateException e) {
128 //IllegalStateException means the second scheduling was not allowed.
132 //let it cancel itself
133 et.cancelSelfOnNextExecution(future);
135 //it should be done after it executes itself one more time.
136 Assert.assertTrue("it should be done", waitFor(future::isDone, 5000));
137 Assert.assertTrue("The test failed to execute", et.isExecuted);
145 * Makes sure the Future.isDone one only returns true if its runnable is not currently executing and will not
146 * execute in the future. Default implementation of isDone() returns true immediately after the future is
147 * canceled -- Even if is there is still a thread actively executing the runnable
150 public void test_scheduleBaseRunnable_Base_isDone_Ignore_Interrupt() throws Exception{
153 final ExecuteTest et = new ExecuteTest();
155 //configure test to run long and ignore interrupt
156 et.isContinuous = true;
157 et.isIgnoreInterrupt = true;
161 Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
168 //make sure it is running
169 Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
170 Assert.assertTrue("It should be running",et.waitForTestExec(1000));
171 Assert.assertFalse("It Should not be Done", future.isDone());
173 //cancel it and make sure it is still running
175 Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
176 Assert.assertTrue("It should be running",et.waitForTestExec(1000));
177 Assert.assertFalse("It Should not be Done", future.isDone());
179 //let the thread die and then make sure its done
180 et.isContinuous = false;
181 Assert.assertTrue("It should not be running",waitForNot(et::isExecuting,1000));
182 Assert.assertTrue("It Should be Done", future.isDone());
190 * Make sure the base Future.isDone returns false until the sub callable has completed execution.
193 public void test_scheduleBaseRunnable_SubTask_isDone_Ignore_Interrupt() throws Exception{
196 final ExecuteTest baseET = new ExecuteTest();
197 final ExecuteTest subET = new ExecuteTest();
199 //configure sub test to run long and ignore interrupt
200 subET.isContinuous = true;
201 subET.isIgnoreInterrupt = true;
204 //schedule the Base test to run and make sure it is running.
205 Future<?> baseFuture = asyncTaskHelper.scheduleBaseRunnable(
211 Assert.assertTrue("baseET should be running",waitFor(baseET::isExecuted,1000));
212 Assert.assertFalse("baseET Should not be Done because it runs at a fix rate", baseFuture.isDone());
215 //schedule the sub task and make sure it is running
216 Future<?> subFuture = asyncTaskHelper.submitBaseSubCallable(subET::test);
217 Assert.assertTrue("subET should be running",waitFor(subET::isExecuting,1000));
218 Assert.assertTrue("subET should be running",subET.waitForTestExec(1000));
219 Assert.assertFalse("subET Should not be Done", subFuture.isDone());
220 Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
222 //cancel the base task and make sure isDone is still false
223 baseFuture.cancel(true);
224 Assert.assertTrue("subET should be running",waitFor(subET::isExecuting,1000));
225 Assert.assertTrue("subET should be running",subET.waitForTestExec(1000));
226 Assert.assertFalse("subET Should not be Done",subFuture.isDone());
227 Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
230 //let the sub task die and and make sure the base is now finally done
231 subET.isContinuous = false;
232 Assert.assertTrue("subET should not be running",waitForNot(subET::isExecuting,1000));
233 Assert.assertTrue("subET Should be Done", subFuture.isDone());
234 Assert.assertTrue("baseET Should be Done", baseFuture.isDone());
240 * Make sure the base Future.isDone returns false until the 3 sub callable has completed execution.
241 * Each sub callable will be shutdown one at a time.
244 public void test_scheduleBaseRunnable_SubTasks_isDone() throws Exception {
247 //loop is to test we can run consecutive Base Runnable
248 for (int testIteration = 0; testIteration < 3; testIteration++) {
249 final ExecuteTest baseET = new ExecuteTest();
250 final LinkedList<Sub> subList = new LinkedList<>();
251 for (int i = 0; i < 3; i++) {
253 sub.et.isContinuous = true;
258 //schedule the base runnable and make sure it is running
259 Future<?> baseFuture = asyncTaskHelper.scheduleBaseRunnable(
266 Assert.assertTrue("baseET should be running", waitFor(baseET::isExecuted, 1000));
267 Assert.assertFalse("baseET Should not be Done because it runs at a fix rate", baseFuture.isDone());
270 //schedule the sub Callables and make sure these are running
271 subList.forEach(sub -> sub.future = asyncTaskHelper.submitBaseSubCallable(sub.et::test));
272 for (Sub sub : subList) {
273 Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 100));
274 Assert.assertTrue("subET should be running", sub.et.waitForTestExec(1000));
275 Assert.assertFalse("subET Should not be Done", sub.future.isDone());
277 Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
280 //On each iteration shut down a sub callable. Make sure it stops, the others are still running and the
281 // //base is still running.
282 while (!subList.isEmpty()) {
284 //stop one sub and make sure it stopped
286 Sub sub = subList.removeFirst();
287 Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 1000));
288 sub.et.isContinuous = false;
289 Assert.assertTrue("subET should not be running", waitForNot(sub.et::isExecuting,1000));
290 Assert.assertTrue("subET Should not be Done", sub.future.isDone());
293 //make sure the other are still running
294 for (Sub sub : subList) {
295 Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 1000));
296 Assert.assertTrue("subET should be running", sub.et.waitForTestExec(1000));
297 Assert.assertFalse("subET Should not be Done", sub.future.isDone());
300 //Make sure the Base is still running
301 Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
304 //let the base cancel itself and make sure it stops
305 baseET.cancelSelfOnNextExecution(baseFuture);
306 Assert.assertTrue("baseET should be done", waitFor(baseFuture::isDone, 1000));
312 * Make sure SubCallable cannot be scheduled when there is not BaseRunnable
314 @Test(expected=IllegalStateException.class)
315 public void test_SubTasksScheduleFailWhenNoBase() throws Exception {
316 asyncTaskHelper.submitBaseSubCallable(()->null);
322 * Make sure SubCallable cannot be scheduled when BaseRunnable is cancelled but is still actively running.
324 @Test(expected=IllegalStateException.class)
325 public void test_SubTasksScheduleFailWhenBaseCanceledBeforeisDone() throws Exception {
327 final ExecuteTest et = new ExecuteTest();
328 et.isContinuous = true;
330 Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
337 Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
338 future.cancel(false);
339 Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
342 asyncTaskHelper.submitBaseSubCallable(() -> null);
344 et.isContinuous = false;
353 * Make sure SubCallable cannot be scheduled after a BaseRunnable has completed
355 @Test(expected=IllegalStateException.class)
356 public void test_SubTasksScheduleFailAfterBaseDone() throws Exception {
358 final ExecuteTest et = new ExecuteTest();
360 Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
368 future.cancel(false);
369 Assert.assertTrue("It should not be running",waitFor(future::isDone,1000));
372 asyncTaskHelper.submitBaseSubCallable(() -> null);
374 et.isContinuous = false;
381 * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
382 * Test cancel does not block when BaseRunnable is not scheduled
385 public void test_cancel_noBlockingWhenBaseRunnableNotScheduled() throws Exception{
386 //nothing is running so this should return immediately without TimeoutException
387 asyncTaskHelper.cancelBaseActionRunnable(AppcOam.RPC.stop , AppcOamStates.Started , 1, TimeUnit.MILLISECONDS);
393 * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
394 * Test cancel does blocks until BaseRunnable is done scheduled
397 public void test_cancel_BlockingWhenBaseRunnableNotDone() throws Exception {
400 final ExecuteTest et = new ExecuteTest();
401 et.isContinuous = true;
402 et.isIgnoreInterrupt = true;
403 asyncTaskHelper.scheduleBaseRunnable(
411 Assert.assertTrue("It should be running", waitFor(et::isExecuting, 1000));
414 //we should get a timeout
416 asyncTaskHelper.cancelBaseActionRunnable(
418 AppcOamStates.Started,
420 TimeUnit.MILLISECONDS);
421 Assert.fail("Should have gotten TimeoutException");
422 } catch (TimeoutException e) {
423 //just ignore as it is expected
427 //release the test thread
428 et.isContinuous = false;
431 //we should not get a timeout
432 asyncTaskHelper.cancelBaseActionRunnable(
434 AppcOamStates.Started,
436 TimeUnit.MILLISECONDS);
443 * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
444 * Test cancel does not block when BaseRunnable is not scheduled
447 public void test_BaseRunnableCancelCallback() throws Exception{
449 AtomicReference<AppcOam.RPC> cancelCallback = new AtomicReference<>(null);
451 final ExecuteTest et = new ExecuteTest();
452 et.isContinuous = true;
453 Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
455 , cancelCallback::set
460 Assert.assertTrue("It should be running", waitFor(et::isExecuting, 1000));
461 Assert.assertTrue("It should be running", waitForNot(future::isDone, 1000));
465 asyncTaskHelper.cancelBaseActionRunnable(
467 AppcOamStates.Started,
469 TimeUnit.MILLISECONDS);
470 Assert.fail("Should have gotten TimeoutException");
471 } catch (TimeoutException e) {
472 //just ignore as it is expected
476 Assert.assertEquals("Unexpected rpc in call back",AppcOam.RPC.stop,cancelCallback.get());
487 * @return true if the negation of the expected value is returned from the supplier within the specified
490 private static boolean waitForNot(Supplier<Boolean> s,long timeoutMillis)throws Exception{
491 return waitFor(()->!s.get(),timeoutMillis);
496 * @return true if the expected value is returned from the supplier within the specified
499 private static boolean waitFor(Supplier<Boolean> s,long timeoutMillis) throws Exception {
500 long timeout = TimeUnit.MILLISECONDS.toMillis(timeoutMillis);
501 long expiryTime = System.currentTimeMillis() + timeout;
504 elapsedTime = expiryTime - System.currentTimeMillis();
505 if(elapsedTime < 1) {
515 * This class is used control a thread executed in th {@link #test()}
517 @SuppressWarnings("unused")
518 private static class ExecuteTest {
521 /** A fail safe to insure this TEst does not run indefinitely */
522 private final long EXPIRY_TIME = System.currentTimeMillis() + 10000;
526 /** A thread sets this value to true when it has completed the execution the of executes {@link #test()} */
527 private volatile boolean isExecuted = false;
530 * A thread sets this value to true when it is actively executing {@link #test()} and back to false when
533 private volatile boolean isExecuting = false;
536 * While this value is true, a thread will not be allowed to return from {@link #test()} It will simulate a
539 private volatile boolean isContinuous = false;
542 * When this value is set to true, an ongoing simulation of a long execution of {@link #test()} cannot be force
543 * to abort via a {@link Thread#interrupt()}
545 private volatile boolean isIgnoreInterrupt = false;
549 /** Use to send a signal to the thread executing {@link #notifyTestExcuted(long)} */
550 private Semaphore inner = new Semaphore(0);
552 /** Use to send a signal to the thread executing {@link #waitForTestExec(long)} */
553 private Semaphore outer = new Semaphore(0);
555 /** The {@link Future} of the Thread executing {@link #test()}*/
556 private volatile Future<?> future;
559 * When set the Thread executing {@link #test()} will cancel itself
560 * @param future - The {@link Future} of the Thread executing {@link #test()}
562 private void cancelSelfOnNextExecution(Future<?> future) {
563 this.future = future;
567 private boolean isExecuted() {
571 private boolean isExecuting() {
576 private boolean isContinuous() {
581 private boolean isIgnoreInterrupt() {
582 return isIgnoreInterrupt;
588 * The thread executing this method if blocked from returning until the thread executing
589 * {@link #test()} invokes {@link #notifyTestExcuted(long)} or the specified time elapses
590 * @param timeoutMillis - the amount of time to wait for a execution iteration.
591 * @return true if the Thread is released because of an invocation of {@link #notifyTestExcuted(long)}
592 * @throws InterruptedException - If the Caller thread is interrupted.
594 private boolean waitForTestExec(long timeoutMillis) throws InterruptedException {
596 return outer.tryAcquire(timeoutMillis,TimeUnit.MILLISECONDS);
602 * @return Always returns true.
604 private Boolean test() {
606 System.out.println("started");
609 if (future != null) {
610 future.cancel(false);
613 notifyTestExcuted(1);
617 notifyTestExcuted(100);
629 /** @throws RuntimeException if the test has bee running too long */
630 private void isTestExpired(){
631 if(System.currentTimeMillis() > EXPIRY_TIME){
632 throw new RuntimeException("Something went wrong the test expired.");
638 * The thread executing {@link #test()} if blocked from returning until another thread invokes
639 * {@link #waitForTestExec(long)} or the specified time elapses
640 * @param timeoutMillis - the amount of time to wait for a execution iteration.
641 * @return true if the Thread is released because of an invocation of {@link #waitForTestExec(long)}
643 private boolean notifyTestExcuted(long timeoutMillis){
645 boolean acquire = inner.tryAcquire(timeoutMillis,TimeUnit.MILLISECONDS);
648 System.out.println("release");
650 } catch (InterruptedException e) {
651 if(!isIgnoreInterrupt){
661 ExecuteTest et = new ExecuteTest();
662 Future<?> future = null;