2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2017 Amdocs
8 * =============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.appc.oam.util;
26 import com.att.eelf.configuration.EELFLogger;
27 import org.junit.After;
28 import org.junit.Assert;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.junit.runner.RunWith;
32 import org.mockito.Mockito;
33 import org.onap.appc.oam.AppcOam;
34 import org.onap.appc.statemachine.impl.readers.AppcOamStates;
35 import org.osgi.framework.Bundle;
36 import org.osgi.framework.FrameworkUtil;
37 import org.powermock.api.mockito.PowerMockito;
38 import org.powermock.core.classloader.annotations.PrepareForTest;
39 import org.powermock.modules.junit4.PowerMockRunner;
41 import java.util.LinkedList;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.Semaphore;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.TimeoutException;
46 import java.util.concurrent.atomic.AtomicReference;
47 import java.util.function.Supplier;
49 import static org.mockito.Matchers.any;
50 import static org.mockito.Mockito.mock;
51 import static org.powermock.api.mockito.PowerMockito.mockStatic;
54 @RunWith(PowerMockRunner.class)
55 @PrepareForTest({FrameworkUtil.class})
56 public class AsyncTaskHelperTest {
57 private AsyncTaskHelper asyncTaskHelper;
59 private long initialDelayMillis = 0;
60 private long delayMillis = 10;
64 public void setUp() throws Exception {
66 // to avoid operation on logger fail, mock up the logger
67 EELFLogger mockLogger = mock(EELFLogger.class);
70 mockStatic(FrameworkUtil.class);
71 Bundle myBundle = mock(Bundle.class);
72 Mockito.doReturn("TestBundle").when(myBundle).getSymbolicName();
73 PowerMockito.when(FrameworkUtil.getBundle(any())).thenReturn(myBundle);
75 asyncTaskHelper = new AsyncTaskHelper(mockLogger);
82 public void shutdown(){
83 asyncTaskHelper.close();
88 * Test that Base Runnable
91 * Only one Base Runnable can be scheduled at time;
92 * Future.cancle stops the Base Runnable;
93 * That another Base Runnable can be scheduled once the previous isDone.
96 public void test_scheduleBaseRunnable_Base_isDone() throws Exception{
100 //loop is to test we can run consecutive Base Runnable
101 for(int testIteration = 0; testIteration < 3;testIteration++){
102 final ExecuteTest et = new ExecuteTest();
104 Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
111 //make sure it is running at a fix rate
112 Assert.assertTrue("It should be iterating", et.waitForTestExec(5000));
113 Assert.assertFalse("It Should not be Done", future.isDone());
114 Assert.assertTrue("It should be iterating", et.waitForTestExec(5000));
115 Assert.assertFalse("It Should not be Done", future.isDone());
118 //make sure a seconds Runnable cannot be scheduled when one is already running
120 asyncTaskHelper.scheduleBaseRunnable(et::test
125 Assert.fail("scheduling should have been prevented. ");
126 } catch (IllegalStateException e) {
127 //IllegalStateException means the second scheduling was not allowed.
131 //let it cancel itself
132 et.cancelSelfOnNextExecution(future);
134 //it should be done after it executes itself one more time.
135 Assert.assertTrue("it should be done", waitFor(future::isDone, 5000));
136 Assert.assertTrue("The test failed to execute", et.isExecuted);
144 * Makes sure the Future.isDone one only returns true if its runnable is not currently executing and will not
145 * execute in the future. Default implementation of isDone() returns true immediately after the future is
146 * canceled -- Even if is there is still a thread actively executing the runnable
149 public void test_scheduleBaseRunnable_Base_isDone_Ignore_Interrupt() throws Exception{
152 final ExecuteTest et = new ExecuteTest();
154 //configure test to run long and ignore interrupt
155 et.isContinuous = true;
156 et.isIgnoreInterrupt = true;
160 Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
167 //make sure it is running
168 Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
169 Assert.assertTrue("It should be running",et.waitForTestExec(1000));
170 Assert.assertFalse("It Should not be Done", future.isDone());
172 //cancel it and make sure it is still running
174 Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
175 Assert.assertTrue("It should be running",et.waitForTestExec(1000));
176 Assert.assertFalse("It Should not be Done", future.isDone());
178 //let the thread die and then make sure its done
179 et.isContinuous = false;
180 Assert.assertTrue("It should not be running",waitForNot(et::isExecuting,1000));
181 Assert.assertTrue("It Should be Done", future.isDone());
189 * Make sure the base Future.isDone returns false until the sub callable has completed execution.
192 public void test_scheduleBaseRunnable_SubTask_isDone_Ignore_Interrupt() throws Exception{
195 final ExecuteTest baseET = new ExecuteTest();
196 final ExecuteTest subET = new ExecuteTest();
198 //configure sub test to run long and ignore interrupt
199 subET.isContinuous = true;
200 subET.isIgnoreInterrupt = true;
203 //schedule the Base test to run and make sure it is running.
204 Future<?> baseFuture = asyncTaskHelper.scheduleBaseRunnable(
210 Assert.assertTrue("baseET should be running",waitFor(baseET::isExecuted,1000));
211 Assert.assertFalse("baseET Should not be Done because it runs at a fix rate", baseFuture.isDone());
214 //schedule the sub task and make sure it is running
215 Future<?> subFuture = asyncTaskHelper.submitBaseSubCallable(subET::test);
216 Assert.assertTrue("subET should be running",waitFor(subET::isExecuting,1000));
217 Assert.assertTrue("subET should be running",subET.waitForTestExec(1000));
218 Assert.assertFalse("subET Should not be Done", subFuture.isDone());
219 Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
221 //cancel the base task and make sure isDone is still false
222 baseFuture.cancel(true);
223 Assert.assertTrue("subET should be running",waitFor(subET::isExecuting,1000));
224 Assert.assertTrue("subET should be running",subET.waitForTestExec(1000));
225 Assert.assertFalse("subET Should not be Done",subFuture.isDone());
226 Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
229 //let the sub task die and and make sure the base is now finally done
230 subET.isContinuous = false;
231 Assert.assertTrue("subET should not be running",waitForNot(subET::isExecuting,1000));
232 Assert.assertTrue("subET Should be Done", subFuture.isDone());
233 Assert.assertTrue("baseET Should be Done", baseFuture.isDone());
239 * Make sure the base Future.isDone returns false until the 3 sub callable has completed execution.
240 * Each sub callable will be shutdown one at a time.
242 public void test_scheduleBaseRunnable_SubTasks_isDone() throws Exception {
245 //loop is to test we can run consecutive Base Runnable
246 for (int testIteration = 0; testIteration < 3; testIteration++) {
247 final ExecuteTest baseET = new ExecuteTest();
248 final LinkedList<Sub> subList = new LinkedList<>();
249 for (int i = 0; i < 3; i++) {
251 sub.et.isContinuous = true;
256 //schedule the base runnable and make sure it is running
257 Future<?> baseFuture = asyncTaskHelper.scheduleBaseRunnable(
264 Assert.assertTrue("baseET should be running", waitFor(baseET::isExecuted, 1000));
265 Assert.assertFalse("baseET Should not be Done because it runs at a fix rate", baseFuture.isDone());
268 //schedule the sub Callables and make sure these are running
269 subList.forEach(sub -> sub.future = asyncTaskHelper.submitBaseSubCallable(sub.et::test));
270 for (Sub sub : subList) {
271 Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 100));
272 Assert.assertTrue("subET should be running", sub.et.waitForTestExec(1000));
273 Assert.assertFalse("subET Should not be Done", sub.future.isDone());
275 Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
278 //On each iteration shut down a sub callable. Make sure it stops, the others are still running and the
279 // //base is still running.
280 while (!subList.isEmpty()) {
282 //stop one sub and make sure it stopped
284 Sub sub = subList.removeFirst();
285 Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 1000));
286 sub.et.isContinuous = false;
287 Assert.assertTrue("subET should not be running", waitForNot(sub.et::isExecuting,1000));
288 Assert.assertTrue("subET Should not be Done", sub.future.isDone());
291 //make sure the other are still running
292 for (Sub sub : subList) {
293 Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 1000));
294 Assert.assertTrue("subET should be running", sub.et.waitForTestExec(1000));
295 Assert.assertFalse("subET Should not be Done", sub.future.isDone());
298 //Make sure the Base is still running
299 Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
302 //let the base cancel itself and make sure it stops
303 baseET.cancelSelfOnNextExecution(baseFuture);
304 Assert.assertTrue("baseET should be done", waitFor(baseFuture::isDone, 1000));
310 * Make sure SubCallable cannot be scheduled when there is not BaseRunnable
312 @Test(expected=IllegalStateException.class)
313 public void test_SubTasksScheduleFailWhenNoBase() throws Exception {
314 asyncTaskHelper.submitBaseSubCallable(()->null);
320 * Make sure SubCallable cannot be scheduled when BaseRunnable is cancelled but is still actively running.
322 @Test(expected=IllegalStateException.class)
323 public void test_SubTasksScheduleFailWhenBaseCanceledBeforeisDone() throws Exception {
325 final ExecuteTest et = new ExecuteTest();
326 et.isContinuous = true;
328 Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
335 Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
336 future.cancel(false);
337 Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
340 asyncTaskHelper.submitBaseSubCallable(() -> null);
342 et.isContinuous = false;
351 * Make sure SubCallable cannot be scheduled after a BaseRunnable has completed
353 @Test(expected=IllegalStateException.class)
354 public void test_SubTasksScheduleFailAfterBaseDone() throws Exception {
356 final ExecuteTest et = new ExecuteTest();
358 Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
366 future.cancel(false);
367 Assert.assertTrue("It should not be running",waitFor(future::isDone,1000));
370 asyncTaskHelper.submitBaseSubCallable(() -> null);
372 et.isContinuous = false;
379 * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
380 * Test cancel does not block when BaseRunnable is not scheduled
383 public void test_cancel_noBlockingWhenBaseRunnableNotScheduled() throws Exception{
384 //nothing is running so this should return immediately without TimeoutException
385 asyncTaskHelper.cancelBaseActionRunnable(AppcOam.RPC.stop , AppcOamStates.Started , 1, TimeUnit.MILLISECONDS);
391 * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
392 * Test cancel does blocks until BaseRunnable is done scheduled
395 public void test_cancel_BlockingWhenBaseRunnableNotDone() throws Exception {
398 final ExecuteTest et = new ExecuteTest();
399 et.isContinuous = true;
400 et.isIgnoreInterrupt = true;
401 asyncTaskHelper.scheduleBaseRunnable(
409 Assert.assertTrue("It should be running", waitFor(et::isExecuting, 1000));
412 //we should get a timeout
414 asyncTaskHelper.cancelBaseActionRunnable(
416 AppcOamStates.Started,
418 TimeUnit.MILLISECONDS);
419 Assert.fail("Should have gotten TimeoutException");
420 } catch (TimeoutException e) {
421 //just ignore as it is expected
425 //release the test thread
426 et.isContinuous = false;
429 //we should not get a timeout
430 asyncTaskHelper.cancelBaseActionRunnable(
432 AppcOamStates.Started,
434 TimeUnit.MILLISECONDS);
441 * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
442 * Test cancel does not block when BaseRunnable is not scheduled
445 public void test_BaseRunnableCancelCallback() throws Exception{
447 AtomicReference<AppcOam.RPC> cancelCallback = new AtomicReference<>(null);
449 final ExecuteTest et = new ExecuteTest();
450 et.isContinuous = true;
451 Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
453 , cancelCallback::set
458 Assert.assertTrue("It should be running", waitFor(et::isExecuting, 1000));
459 Assert.assertTrue("It should be running", waitForNot(future::isDone, 1000));
463 asyncTaskHelper.cancelBaseActionRunnable(
465 AppcOamStates.Started,
467 TimeUnit.MILLISECONDS);
468 Assert.fail("Should have gotten TimeoutException");
469 } catch (TimeoutException e) {
470 //just ignore as it is expected
474 Assert.assertEquals("Unexpected rpc in call back",AppcOam.RPC.stop,cancelCallback.get());
485 * @return true if the negation of the expected value is returned from the supplier within the specified
488 private static boolean waitForNot(Supplier<Boolean> s,long timeoutMillis)throws Exception{
489 return waitFor(()->!s.get(),timeoutMillis);
494 * @return true if the expected value is returned from the supplier within the specified
497 private static boolean waitFor(Supplier<Boolean> s,long timeoutMillis) throws Exception {
498 long timeout = TimeUnit.MILLISECONDS.toMillis(timeoutMillis);
499 long expiryTime = System.currentTimeMillis() + timeout;
502 elapsedTime = expiryTime - System.currentTimeMillis();
503 if(elapsedTime < 1) {
513 * This class is used control a thread executed in th {@link #test()}
515 @SuppressWarnings("unused")
516 private static class ExecuteTest {
519 /** A fail safe to insure this TEst does not run indefinitely */
520 private final long EXPIRY_TIME = System.currentTimeMillis() + 10000;
524 /** A thread sets this value to true when it has completed the execution the of executes {@link #test()} */
525 private volatile boolean isExecuted = false;
528 * A thread sets this value to true when it is actively executing {@link #test()} and back to false when
531 private volatile boolean isExecuting = false;
534 * While this value is true, a thread will not be allowed to return from {@link #test()} It will simulate a
537 private volatile boolean isContinuous = false;
540 * When this value is set to true, an ongoing simulation of a long execution of {@link #test()} cannot be force
541 * to abort via a {@link Thread#interrupt()}
543 private volatile boolean isIgnoreInterrupt = false;
547 /** Use to send a signal to the thread executing {@link #notifyTestExcuted(long)} */
548 private Semaphore inner = new Semaphore(0);
550 /** Use to send a signal to the thread executing {@link #waitForTestExec(long)} */
551 private Semaphore outer = new Semaphore(0);
553 /** The {@link Future} of the Thread executing {@link #test()}*/
554 private volatile Future<?> future;
557 * When set the Thread executing {@link #test()} will cancel itself
558 * @param future - The {@link Future} of the Thread executing {@link #test()}
560 private void cancelSelfOnNextExecution(Future<?> future) {
561 this.future = future;
565 private boolean isExecuted() {
569 private boolean isExecuting() {
574 private boolean isContinuous() {
579 private boolean isIgnoreInterrupt() {
580 return isIgnoreInterrupt;
586 * The thread executing this method if blocked from returning until the thread executing
587 * {@link #test()} invokes {@link #notifyTestExcuted(long)} or the specified time elapses
588 * @param timeoutMillis - the amount of time to wait for a execution iteration.
589 * @return true if the Thread is released because of an invocation of {@link #notifyTestExcuted(long)}
590 * @throws InterruptedException - If the Caller thread is interrupted.
592 private boolean waitForTestExec(long timeoutMillis) throws InterruptedException {
594 return outer.tryAcquire(timeoutMillis,TimeUnit.MILLISECONDS);
600 * @return Always returns true.
602 private Boolean test() {
604 System.out.println("started");
607 if (future != null) {
608 future.cancel(false);
611 notifyTestExcuted(1);
615 notifyTestExcuted(100);
627 /** @throws RuntimeException if the test has bee running too long */
628 private void isTestExpired(){
629 if(System.currentTimeMillis() > EXPIRY_TIME){
630 throw new RuntimeException("Something went wrong the test expired.");
636 * The thread executing {@link #test()} if blocked from returning until another thread invokes
637 * {@link #waitForTestExec(long)} or the specified time elapses
638 * @param timeoutMillis - the amount of time to wait for a execution iteration.
639 * @return true if the Thread is released because of an invocation of {@link #waitForTestExec(long)}
641 private boolean notifyTestExcuted(long timeoutMillis){
643 boolean acquire = inner.tryAcquire(timeoutMillis,TimeUnit.MILLISECONDS);
646 System.out.println("release");
648 } catch (InterruptedException e) {
649 if(!isIgnoreInterrupt){
659 ExecuteTest et = new ExecuteTest();
660 Future<?> future = null;