2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2017 Amdocs
8 * =============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 * ============LICENSE_END=========================================================
25 package org.onap.appc.oam.util;
27 import com.att.eelf.configuration.EELFLogger;
28 import org.junit.After;
29 import org.junit.Assert;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.junit.runner.RunWith;
33 import org.mockito.Mockito;
34 import org.onap.appc.oam.AppcOam;
35 import org.onap.appc.statemachine.impl.readers.AppcOamStates;
36 import org.osgi.framework.Bundle;
37 import org.osgi.framework.FrameworkUtil;
38 import org.powermock.api.mockito.PowerMockito;
39 import org.powermock.core.classloader.annotations.PrepareForTest;
40 import org.powermock.modules.junit4.PowerMockRunner;
42 import java.util.LinkedList;
43 import java.util.concurrent.Future;
44 import java.util.concurrent.Semaphore;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.TimeoutException;
47 import java.util.concurrent.atomic.AtomicReference;
48 import java.util.function.Supplier;
50 import static org.mockito.Matchers.any;
51 import static org.mockito.Mockito.mock;
52 import static org.powermock.api.mockito.PowerMockito.mockStatic;
55 @RunWith(PowerMockRunner.class)
56 @PrepareForTest({FrameworkUtil.class})
57 public class AsyncTaskHelperTest {
58 private AsyncTaskHelper asyncTaskHelper;
60 private long initialDelayMillis = 0;
61 private long delayMillis = 10;
65 public void setUp() throws Exception {
67 // to avoid operation on logger fail, mock up the logger
68 EELFLogger mockLogger = mock(EELFLogger.class);
71 mockStatic(FrameworkUtil.class);
72 Bundle myBundle = mock(Bundle.class);
73 Mockito.doReturn("TestBundle").when(myBundle).getSymbolicName();
74 PowerMockito.when(FrameworkUtil.getBundle(any())).thenReturn(myBundle);
76 asyncTaskHelper = new AsyncTaskHelper(mockLogger);
83 public void shutdown(){
84 asyncTaskHelper.close();
89 * Test that Base Runnable
92 * Only one Base Runnable can be scheduled at time;
93 * Future.cancle stops the Base Runnable;
94 * That another Base Runnable can be scheduled once the previous isDone.
97 public void test_scheduleBaseRunnable_Base_isDone() throws Exception{
101 //loop is to test we can run consecutive Base Runnable
102 for(int testIteration = 0; testIteration < 3;testIteration++){
103 final ExecuteTest et = new ExecuteTest();
105 Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
112 //make sure it is running at a fix rate
113 Assert.assertTrue("It should be iterating", et.waitForTestExec(5000));
114 Assert.assertFalse("It Should not be Done", future.isDone());
115 Assert.assertTrue("It should be iterating", et.waitForTestExec(5000));
116 Assert.assertFalse("It Should not be Done", future.isDone());
119 //make sure a seconds Runnable cannot be scheduled when one is already running
121 asyncTaskHelper.scheduleBaseRunnable(et::test
126 Assert.fail("scheduling should have been prevented. ");
127 } catch (IllegalStateException e) {
128 //IllegalStateException means the second scheduling was not allowed.
132 //let it cancel itself
133 et.cancelSelfOnNextExecution(future);
135 //it should be done after it executes itself one more time.
136 Assert.assertTrue("it should be done", waitFor(future::isDone, 5000));
137 Assert.assertTrue("The test failed to execute", et.isExecuted);
145 * Makes sure the Future.isDone one only returns true if its runnable is not currently executing and will not
146 * execute in the future. Default implementation of isDone() returns true immediately after the future is
147 * canceled -- Even if is there is still a thread actively executing the runnable
150 public void test_scheduleBaseRunnable_Base_isDone_Ignore_Interrupt() throws Exception{
153 final ExecuteTest et = new ExecuteTest();
155 //configure test to run long and ignore interrupt
156 et.isContinuous = true;
157 et.isIgnoreInterrupt = true;
161 Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
168 //make sure it is running
169 Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
170 Assert.assertTrue("It should be running",et.waitForTestExec(1000));
171 Assert.assertFalse("It Should not be Done", future.isDone());
173 //cancel it and make sure it is still running
175 Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
176 Assert.assertTrue("It should be running",et.waitForTestExec(1000));
177 Assert.assertFalse("It Should not be Done", future.isDone());
179 //let the thread die and then make sure its done
180 et.isContinuous = false;
181 Assert.assertTrue("It should not be running",waitForNot(et::isExecuting,1000));
182 Assert.assertTrue("It Should be Done", future.isDone());
190 * Make sure the base Future.isDone returns false until the sub callable has completed execution.
193 public void test_scheduleBaseRunnable_SubTask_isDone_Ignore_Interrupt() throws Exception{
196 final ExecuteTest baseET = new ExecuteTest();
197 final ExecuteTest subET = new ExecuteTest();
199 //configure sub test to run long and ignore interrupt
200 subET.isContinuous = true;
201 subET.isIgnoreInterrupt = true;
204 //schedule the Base test to run and make sure it is running.
205 Future<?> baseFuture = asyncTaskHelper.scheduleBaseRunnable(
211 Assert.assertTrue("baseET should be running",waitFor(baseET::isExecuted,1000));
212 Assert.assertFalse("baseET Should not be Done because it runs at a fix rate", baseFuture.isDone());
215 //schedule the sub task and make sure it is running
216 Future<?> subFuture = asyncTaskHelper.submitBaseSubCallable(subET::test);
217 Assert.assertTrue("subET should be running",waitFor(subET::isExecuting,1000));
218 Assert.assertTrue("subET should be running",subET.waitForTestExec(1000));
219 Assert.assertFalse("subET Should not be Done", subFuture.isDone());
220 Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
222 //cancel the base task and make sure isDone is still false
223 baseFuture.cancel(true);
224 Assert.assertTrue("subET should be running",waitFor(subET::isExecuting,1000));
225 Assert.assertTrue("subET should be running",subET.waitForTestExec(1000));
226 Assert.assertFalse("subET Should not be Done",subFuture.isDone());
227 Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
230 //let the sub task die and and make sure the base is now finally done
231 subET.isContinuous = false;
232 Assert.assertTrue("subET should not be running",waitForNot(subET::isExecuting,1000));
233 Assert.assertTrue("subET Should be Done", subFuture.isDone());
234 Assert.assertTrue("baseET Should be Done", baseFuture.isDone());
240 * Make sure the base Future.isDone returns false until the 3 sub callable has completed execution.
241 * Each sub callable will be shutdown one at a time.
243 public void test_scheduleBaseRunnable_SubTasks_isDone() throws Exception {
246 //loop is to test we can run consecutive Base Runnable
247 for (int testIteration = 0; testIteration < 3; testIteration++) {
248 final ExecuteTest baseET = new ExecuteTest();
249 final LinkedList<Sub> subList = new LinkedList<>();
250 for (int i = 0; i < 3; i++) {
252 sub.et.isContinuous = true;
257 //schedule the base runnable and make sure it is running
258 Future<?> baseFuture = asyncTaskHelper.scheduleBaseRunnable(
265 Assert.assertTrue("baseET should be running", waitFor(baseET::isExecuted, 1000));
266 Assert.assertFalse("baseET Should not be Done because it runs at a fix rate", baseFuture.isDone());
269 //schedule the sub Callables and make sure these are running
270 subList.forEach(sub -> sub.future = asyncTaskHelper.submitBaseSubCallable(sub.et::test));
271 for (Sub sub : subList) {
272 Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 100));
273 Assert.assertTrue("subET should be running", sub.et.waitForTestExec(1000));
274 Assert.assertFalse("subET Should not be Done", sub.future.isDone());
276 Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
279 //On each iteration shut down a sub callable. Make sure it stops, the others are still running and the
280 // //base is still running.
281 while (!subList.isEmpty()) {
283 //stop one sub and make sure it stopped
285 Sub sub = subList.removeFirst();
286 Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 1000));
287 sub.et.isContinuous = false;
288 Assert.assertTrue("subET should not be running", waitForNot(sub.et::isExecuting,1000));
289 Assert.assertTrue("subET Should not be Done", sub.future.isDone());
292 //make sure the other are still running
293 for (Sub sub : subList) {
294 Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 1000));
295 Assert.assertTrue("subET should be running", sub.et.waitForTestExec(1000));
296 Assert.assertFalse("subET Should not be Done", sub.future.isDone());
299 //Make sure the Base is still running
300 Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
303 //let the base cancel itself and make sure it stops
304 baseET.cancelSelfOnNextExecution(baseFuture);
305 Assert.assertTrue("baseET should be done", waitFor(baseFuture::isDone, 1000));
311 * Make sure SubCallable cannot be scheduled when there is not BaseRunnable
313 @Test(expected=IllegalStateException.class)
314 public void test_SubTasksScheduleFailWhenNoBase() throws Exception {
315 asyncTaskHelper.submitBaseSubCallable(()->null);
321 * Make sure SubCallable cannot be scheduled when BaseRunnable is cancelled but is still actively running.
323 @Test(expected=IllegalStateException.class)
324 public void test_SubTasksScheduleFailWhenBaseCanceledBeforeisDone() throws Exception {
326 final ExecuteTest et = new ExecuteTest();
327 et.isContinuous = true;
329 Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
336 Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
337 future.cancel(false);
338 Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
341 asyncTaskHelper.submitBaseSubCallable(() -> null);
343 et.isContinuous = false;
352 * Make sure SubCallable cannot be scheduled after a BaseRunnable has completed
354 @Test(expected=IllegalStateException.class)
355 public void test_SubTasksScheduleFailAfterBaseDone() throws Exception {
357 final ExecuteTest et = new ExecuteTest();
359 Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
367 future.cancel(false);
368 Assert.assertTrue("It should not be running",waitFor(future::isDone,1000));
371 asyncTaskHelper.submitBaseSubCallable(() -> null);
373 et.isContinuous = false;
380 * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
381 * Test cancel does not block when BaseRunnable is not scheduled
384 public void test_cancel_noBlockingWhenBaseRunnableNotScheduled() throws Exception{
385 //nothing is running so this should return immediately without TimeoutException
386 asyncTaskHelper.cancelBaseActionRunnable(AppcOam.RPC.stop , AppcOamStates.Started , 1, TimeUnit.MILLISECONDS);
392 * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
393 * Test cancel does blocks until BaseRunnable is done scheduled
396 public void test_cancel_BlockingWhenBaseRunnableNotDone() throws Exception {
399 final ExecuteTest et = new ExecuteTest();
400 et.isContinuous = true;
401 et.isIgnoreInterrupt = true;
402 asyncTaskHelper.scheduleBaseRunnable(
410 Assert.assertTrue("It should be running", waitFor(et::isExecuting, 1000));
413 //we should get a timeout
415 asyncTaskHelper.cancelBaseActionRunnable(
417 AppcOamStates.Started,
419 TimeUnit.MILLISECONDS);
420 Assert.fail("Should have gotten TimeoutException");
421 } catch (TimeoutException e) {
422 //just ignore as it is expected
426 //release the test thread
427 et.isContinuous = false;
430 //we should not get a timeout
431 asyncTaskHelper.cancelBaseActionRunnable(
433 AppcOamStates.Started,
435 TimeUnit.MILLISECONDS);
442 * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
443 * Test cancel does not block when BaseRunnable is not scheduled
446 public void test_BaseRunnableCancelCallback() throws Exception{
448 AtomicReference<AppcOam.RPC> cancelCallback = new AtomicReference<>(null);
450 final ExecuteTest et = new ExecuteTest();
451 et.isContinuous = true;
452 Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
454 , cancelCallback::set
459 Assert.assertTrue("It should be running", waitFor(et::isExecuting, 1000));
460 Assert.assertTrue("It should be running", waitForNot(future::isDone, 1000));
464 asyncTaskHelper.cancelBaseActionRunnable(
466 AppcOamStates.Started,
468 TimeUnit.MILLISECONDS);
469 Assert.fail("Should have gotten TimeoutException");
470 } catch (TimeoutException e) {
471 //just ignore as it is expected
475 Assert.assertEquals("Unexpected rpc in call back",AppcOam.RPC.stop,cancelCallback.get());
486 * @return true if the negation of the expected value is returned from the supplier within the specified
489 private static boolean waitForNot(Supplier<Boolean> s,long timeoutMillis)throws Exception{
490 return waitFor(()->!s.get(),timeoutMillis);
495 * @return true if the expected value is returned from the supplier within the specified
498 private static boolean waitFor(Supplier<Boolean> s,long timeoutMillis) throws Exception {
499 long timeout = TimeUnit.MILLISECONDS.toMillis(timeoutMillis);
500 long expiryTime = System.currentTimeMillis() + timeout;
503 elapsedTime = expiryTime - System.currentTimeMillis();
504 if(elapsedTime < 1) {
514 * This class is used control a thread executed in th {@link #test()}
516 @SuppressWarnings("unused")
517 private static class ExecuteTest {
520 /** A fail safe to insure this TEst does not run indefinitely */
521 private final long EXPIRY_TIME = System.currentTimeMillis() + 10000;
525 /** A thread sets this value to true when it has completed the execution the of executes {@link #test()} */
526 private volatile boolean isExecuted = false;
529 * A thread sets this value to true when it is actively executing {@link #test()} and back to false when
532 private volatile boolean isExecuting = false;
535 * While this value is true, a thread will not be allowed to return from {@link #test()} It will simulate a
538 private volatile boolean isContinuous = false;
541 * When this value is set to true, an ongoing simulation of a long execution of {@link #test()} cannot be force
542 * to abort via a {@link Thread#interrupt()}
544 private volatile boolean isIgnoreInterrupt = false;
548 /** Use to send a signal to the thread executing {@link #notifyTestExcuted(long)} */
549 private Semaphore inner = new Semaphore(0);
551 /** Use to send a signal to the thread executing {@link #waitForTestExec(long)} */
552 private Semaphore outer = new Semaphore(0);
554 /** The {@link Future} of the Thread executing {@link #test()}*/
555 private volatile Future<?> future;
558 * When set the Thread executing {@link #test()} will cancel itself
559 * @param future - The {@link Future} of the Thread executing {@link #test()}
561 private void cancelSelfOnNextExecution(Future<?> future) {
562 this.future = future;
566 private boolean isExecuted() {
570 private boolean isExecuting() {
575 private boolean isContinuous() {
580 private boolean isIgnoreInterrupt() {
581 return isIgnoreInterrupt;
587 * The thread executing this method if blocked from returning until the thread executing
588 * {@link #test()} invokes {@link #notifyTestExcuted(long)} or the specified time elapses
589 * @param timeoutMillis - the amount of time to wait for a execution iteration.
590 * @return true if the Thread is released because of an invocation of {@link #notifyTestExcuted(long)}
591 * @throws InterruptedException - If the Caller thread is interrupted.
593 private boolean waitForTestExec(long timeoutMillis) throws InterruptedException {
595 return outer.tryAcquire(timeoutMillis,TimeUnit.MILLISECONDS);
601 * @return Always returns true.
603 private Boolean test() {
605 System.out.println("started");
608 if (future != null) {
609 future.cancel(false);
612 notifyTestExcuted(1);
616 notifyTestExcuted(100);
628 /** @throws RuntimeException if the test has bee running too long */
629 private void isTestExpired(){
630 if(System.currentTimeMillis() > EXPIRY_TIME){
631 throw new RuntimeException("Something went wrong the test expired.");
637 * The thread executing {@link #test()} if blocked from returning until another thread invokes
638 * {@link #waitForTestExec(long)} or the specified time elapses
639 * @param timeoutMillis - the amount of time to wait for a execution iteration.
640 * @return true if the Thread is released because of an invocation of {@link #waitForTestExec(long)}
642 private boolean notifyTestExcuted(long timeoutMillis){
644 boolean acquire = inner.tryAcquire(timeoutMillis,TimeUnit.MILLISECONDS);
647 System.out.println("release");
649 } catch (InterruptedException e) {
650 if(!isIgnoreInterrupt){
660 ExecuteTest et = new ExecuteTest();
661 Future<?> future = null;