Include impacted changes for APPC-346,APPC-348
[appc.git] / appc-oam / appc-oam-bundle / src / test / java / org / onap / appc / oam / util / AsyncTaskHelperTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  * ============LICENSE_END=========================================================
23  */
24
25 package org.onap.appc.oam.util;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import org.junit.After;
29 import org.junit.Assert;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.junit.runner.RunWith;
33 import org.mockito.Mockito;
34 import org.onap.appc.oam.AppcOam;
35 import org.onap.appc.statemachine.impl.readers.AppcOamStates;
36 import org.osgi.framework.Bundle;
37 import org.osgi.framework.FrameworkUtil;
38 import org.powermock.api.mockito.PowerMockito;
39 import org.powermock.core.classloader.annotations.PrepareForTest;
40 import org.powermock.modules.junit4.PowerMockRunner;
41
42 import java.util.LinkedList;
43 import java.util.concurrent.Future;
44 import java.util.concurrent.Semaphore;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.TimeoutException;
47 import java.util.concurrent.atomic.AtomicReference;
48 import java.util.function.Supplier;
49
50 import static org.mockito.Matchers.any;
51 import static org.mockito.Mockito.mock;
52 import static org.powermock.api.mockito.PowerMockito.mockStatic;
53
54
55 @RunWith(PowerMockRunner.class)
56 @PrepareForTest({FrameworkUtil.class})
57 public class AsyncTaskHelperTest {
58     private AsyncTaskHelper asyncTaskHelper;
59
60     private long initialDelayMillis = 0;
61     private long delayMillis = 10;
62
63
64     @Before
65     public void setUp() throws Exception {
66
67         // to avoid operation on logger fail, mock up the logger
68         EELFLogger mockLogger = mock(EELFLogger.class);
69
70
71         mockStatic(FrameworkUtil.class);
72         Bundle myBundle = mock(Bundle.class);
73         Mockito.doReturn("TestBundle").when(myBundle).getSymbolicName();
74         PowerMockito.when(FrameworkUtil.getBundle(any())).thenReturn(myBundle);
75
76         asyncTaskHelper = new AsyncTaskHelper(mockLogger);
77
78
79     }
80
81
82     @After
83     public void shutdown(){
84         asyncTaskHelper.close();
85     }
86
87
88     /**
89      * Test that Base Runnable
90      *
91      * Runs at a fix rate;
92      * Only one Base Runnable  can be scheduled at time;
93      * Future.cancle stops the Base Runnable;
94      * That another Base Runnable  can be scheduled once the previous isDone.
95      */
96     @Test
97     public void test_scheduleBaseRunnable_Base_isDone() throws Exception{
98
99
100
101         //loop is to test we can run consecutive Base Runnable
102         for(int testIteration = 0; testIteration < 3;testIteration++){
103             final ExecuteTest et = new ExecuteTest();
104
105             Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
106                     et::test
107                     , s -> { }
108                     ,initialDelayMillis
109                     ,delayMillis
110             );
111
112             //make sure it is running at a fix rate
113             Assert.assertTrue("It should be iterating", et.waitForTestExec(5000));
114             Assert.assertFalse("It Should not be Done", future.isDone());
115             Assert.assertTrue("It should be iterating", et.waitForTestExec(5000));
116             Assert.assertFalse("It Should not be Done", future.isDone());
117
118
119             //make sure a seconds Runnable cannot be scheduled when one is already running
120             try {
121                 asyncTaskHelper.scheduleBaseRunnable(et::test
122                         , s -> {}
123                         ,initialDelayMillis
124                         ,delayMillis
125                 );
126                 Assert.fail("scheduling should have been prevented.  ");
127             } catch (IllegalStateException e) {
128                 //IllegalStateException means the second scheduling was not allowed.
129             }
130
131
132             //let it cancel itself
133             et.cancelSelfOnNextExecution(future);
134
135             //it should be done after it executes itself one more time.
136             Assert.assertTrue("it should be done", waitFor(future::isDone, 5000));
137             Assert.assertTrue("The test failed to execute", et.isExecuted);
138         }
139
140
141     }
142
143
144     /**
145      * Makes sure the Future.isDone one only returns true if its runnable is not currently executing and will not
146      * execute in the future.  Default implementation of isDone() returns true immediately after the future is
147      * canceled -- Even if is there is still a thread actively executing the runnable
148      */
149     @Test
150     public void test_scheduleBaseRunnable_Base_isDone_Ignore_Interrupt() throws Exception{
151
152
153         final ExecuteTest et = new ExecuteTest();
154
155         //configure test to run long and ignore interrupt
156         et.isContinuous = true;
157         et.isIgnoreInterrupt = true;
158
159
160
161         Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
162                 et::test
163                 , s->{}
164                 ,initialDelayMillis
165                 ,delayMillis
166         );
167
168         //make sure it is running
169         Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
170         Assert.assertTrue("It should be running",et.waitForTestExec(1000));
171         Assert.assertFalse("It Should not be Done", future.isDone());
172
173         //cancel it and make sure it is still running
174         future.cancel(true);
175         Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
176         Assert.assertTrue("It should be running",et.waitForTestExec(1000));
177         Assert.assertFalse("It Should not be Done", future.isDone());
178
179         //let the thread die and then make sure its done
180         et.isContinuous = false;
181         Assert.assertTrue("It should not be running",waitForNot(et::isExecuting,1000));
182         Assert.assertTrue("It Should be Done", future.isDone());
183
184     }
185
186
187
188
189     /**
190      * Make sure the base Future.isDone returns false until the sub callable has completed execution.
191      */
192     @Test
193     public void test_scheduleBaseRunnable_SubTask_isDone_Ignore_Interrupt() throws Exception{
194
195
196         final ExecuteTest baseET = new ExecuteTest();
197         final ExecuteTest subET = new ExecuteTest();
198
199         //configure sub test to run long and ignore interrupt
200         subET.isContinuous = true;
201         subET.isIgnoreInterrupt = true;
202
203
204         //schedule the Base test to run and make sure it is running.
205         Future<?> baseFuture = asyncTaskHelper.scheduleBaseRunnable(
206                 baseET::test
207                 ,s->{}
208                 ,initialDelayMillis
209                 ,delayMillis
210                 );
211         Assert.assertTrue("baseET should be running",waitFor(baseET::isExecuted,1000));
212         Assert.assertFalse("baseET Should not be Done because it runs at a fix rate", baseFuture.isDone());
213
214
215         //schedule the sub task and make sure it is  running
216         Future<?> subFuture = asyncTaskHelper.submitBaseSubCallable(subET::test);
217         Assert.assertTrue("subET should be running",waitFor(subET::isExecuting,1000));
218         Assert.assertTrue("subET should be running",subET.waitForTestExec(1000));
219         Assert.assertFalse("subET Should not be Done", subFuture.isDone());
220         Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
221
222         //cancel the base task and make sure isDone is still false
223         baseFuture.cancel(true);
224         Assert.assertTrue("subET should be running",waitFor(subET::isExecuting,1000));
225         Assert.assertTrue("subET should be running",subET.waitForTestExec(1000));
226         Assert.assertFalse("subET Should not be Done",subFuture.isDone());
227         Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
228
229
230         //let the sub task die and and make sure the base is now finally done
231         subET.isContinuous = false;
232         Assert.assertTrue("subET should not be running",waitForNot(subET::isExecuting,1000));
233         Assert.assertTrue("subET Should be Done", subFuture.isDone());
234         Assert.assertTrue("baseET Should be Done", baseFuture.isDone());
235
236     }
237
238
239     /**
240      * Make sure the base Future.isDone returns false until the 3 sub callable has completed execution.
241      * Each sub callable will be shutdown one at a time.
242      */
243     public void test_scheduleBaseRunnable_SubTasks_isDone() throws Exception {
244
245
246         //loop is to test we can run consecutive Base Runnable
247         for (int testIteration = 0; testIteration < 3; testIteration++) {
248             final ExecuteTest baseET = new ExecuteTest();
249             final LinkedList<Sub> subList = new LinkedList<>();
250             for (int i = 0; i < 3; i++) {
251                 Sub sub = new Sub();
252                 sub.et.isContinuous = true;
253                 subList.add(sub);
254             }
255
256
257             //schedule the base runnable and make sure it is running
258             Future<?> baseFuture = asyncTaskHelper.scheduleBaseRunnable(
259                     baseET::test
260                     , s -> {
261                     }
262                     , initialDelayMillis
263                     , delayMillis
264             );
265             Assert.assertTrue("baseET should be running", waitFor(baseET::isExecuted, 1000));
266             Assert.assertFalse("baseET Should not be Done because it runs at a fix rate", baseFuture.isDone());
267
268
269             //schedule the sub Callables and make sure these are running
270             subList.forEach(sub -> sub.future = asyncTaskHelper.submitBaseSubCallable(sub.et::test));
271             for (Sub sub : subList) {
272                 Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 100));
273                 Assert.assertTrue("subET should be running", sub.et.waitForTestExec(1000));
274                 Assert.assertFalse("subET Should not be Done", sub.future.isDone());
275             }
276             Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
277
278
279             //On each iteration shut down a sub callable.  Make sure it stops, the others are still running and the
280             // //base is still running.
281             while (!subList.isEmpty()) {
282
283                 //stop one sub and make sure it stopped
284                 {
285                     Sub sub = subList.removeFirst();
286                     Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 1000));
287                     sub.et.isContinuous = false;
288                     Assert.assertTrue("subET should not be running", waitForNot(sub.et::isExecuting,1000));
289                     Assert.assertTrue("subET Should not be Done", sub.future.isDone());
290                 }
291
292                 //make sure the other are still running
293                 for (Sub sub : subList) {
294                     Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 1000));
295                     Assert.assertTrue("subET should be running", sub.et.waitForTestExec(1000));
296                     Assert.assertFalse("subET Should not be Done", sub.future.isDone());
297                 }
298
299                 //Make sure the Base is still running
300                 Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
301             }
302
303             //let the base cancel itself and make sure it stops
304             baseET.cancelSelfOnNextExecution(baseFuture);
305             Assert.assertTrue("baseET should be done", waitFor(baseFuture::isDone, 1000));
306         }
307     }
308
309
310     /**
311      * Make sure SubCallable cannot be scheduled when there is not BaseRunnable
312      */
313     @Test(expected=IllegalStateException.class)
314     public void test_SubTasksScheduleFailWhenNoBase() throws Exception {
315         asyncTaskHelper.submitBaseSubCallable(()->null);
316     }
317
318
319
320     /**
321      * Make sure SubCallable cannot be scheduled when BaseRunnable is cancelled but is still actively running.
322      */
323     @Test(expected=IllegalStateException.class)
324     public void test_SubTasksScheduleFailWhenBaseCanceledBeforeisDone() throws Exception {
325
326         final ExecuteTest et = new ExecuteTest();
327         et.isContinuous = true;
328
329         Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
330                 et::test
331                 , s -> { }
332                 ,initialDelayMillis
333                 ,delayMillis
334         );
335
336         Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
337         future.cancel(false);
338         Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
339
340         try {
341             asyncTaskHelper.submitBaseSubCallable(() -> null);
342         } finally {
343             et.isContinuous = false;
344         }
345
346
347
348     }
349
350
351     /**
352      * Make sure SubCallable cannot be scheduled after a BaseRunnable has completed
353      */
354     @Test(expected=IllegalStateException.class)
355     public void test_SubTasksScheduleFailAfterBaseDone() throws Exception {
356
357         final ExecuteTest et = new ExecuteTest();
358
359         Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
360                 et::test
361                 , s -> { }
362                 ,initialDelayMillis
363                 ,delayMillis
364         );
365
366
367         future.cancel(false);
368         Assert.assertTrue("It should not be running",waitFor(future::isDone,1000));
369
370         try {
371             asyncTaskHelper.submitBaseSubCallable(() -> null);
372         } finally {
373             et.isContinuous = false;
374         }
375
376     }
377
378
379     /**
380      * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
381      * Test cancel does not block when BaseRunnable is not scheduled
382      */
383     @Test
384     public void test_cancel_noBlockingWhenBaseRunnableNotScheduled() throws Exception{
385         //nothing is running so this should return immediately without TimeoutException
386         asyncTaskHelper.cancelBaseActionRunnable(AppcOam.RPC.stop , AppcOamStates.Started , 1, TimeUnit.MILLISECONDS);
387     }
388
389
390
391     /**
392      * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
393      * Test cancel does blocks until BaseRunnable is done scheduled
394      */
395     @Test()
396     public void test_cancel_BlockingWhenBaseRunnableNotDone() throws Exception {
397
398
399         final ExecuteTest et = new ExecuteTest();
400         et.isContinuous = true;
401         et.isIgnoreInterrupt = true;
402         asyncTaskHelper.scheduleBaseRunnable(
403                 et::test
404                 , s -> {
405                 }
406                 , initialDelayMillis
407                 , delayMillis
408         );
409
410         Assert.assertTrue("It should be running", waitFor(et::isExecuting, 1000));
411
412
413         //we should get a timeout
414         try {
415             asyncTaskHelper.cancelBaseActionRunnable(
416                     AppcOam.RPC.stop,
417                     AppcOamStates.Started,
418                     1,
419                     TimeUnit.MILLISECONDS);
420             Assert.fail("Should have gotten TimeoutException");
421         } catch (TimeoutException e) {
422             //just ignore as it is expected
423         }
424
425
426         //release the test thread
427         et.isContinuous = false;
428
429
430         //we should not get a timeout
431         asyncTaskHelper.cancelBaseActionRunnable(
432                 AppcOam.RPC.stop,
433                 AppcOamStates.Started,
434                 1000,
435                 TimeUnit.MILLISECONDS);
436
437     }
438
439
440
441     /**
442      * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
443      * Test cancel does not block when BaseRunnable is not scheduled
444      */
445     @Test
446     public void test_BaseRunnableCancelCallback() throws Exception{
447
448         AtomicReference<AppcOam.RPC> cancelCallback = new AtomicReference<>(null);
449
450         final ExecuteTest et = new ExecuteTest();
451         et.isContinuous = true;
452         Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
453                 et::test
454                 , cancelCallback::set
455                 , initialDelayMillis
456                 , delayMillis
457         );
458
459         Assert.assertTrue("It should be running", waitFor(et::isExecuting, 1000));
460         Assert.assertTrue("It should be running", waitForNot(future::isDone, 1000));
461
462
463         try {
464             asyncTaskHelper.cancelBaseActionRunnable(
465                     AppcOam.RPC.stop,
466                     AppcOamStates.Started,
467                     1,
468                     TimeUnit.MILLISECONDS);
469             Assert.fail("Should have gotten TimeoutException");
470         } catch (TimeoutException e) {
471            //just ignore as it is expected
472         }
473
474
475         Assert.assertEquals("Unexpected rpc in call back",AppcOam.RPC.stop,cancelCallback.get());
476     }
477
478
479
480
481
482
483
484
485     /**
486      * @return true if the negation of the expected value is returned from the supplier within the specified
487      * amount of time
488      */
489     private static boolean waitForNot(Supplier<Boolean> s,long timeoutMillis)throws Exception{
490         return waitFor(()->!s.get(),timeoutMillis);
491     }
492
493
494     /**
495      * @return true if the expected value is returned from the supplier within the specified
496      * amount of time
497      */
498     private static boolean waitFor(Supplier<Boolean> s,long timeoutMillis) throws Exception {
499         long timeout = TimeUnit.MILLISECONDS.toMillis(timeoutMillis);
500         long expiryTime = System.currentTimeMillis() + timeout;
501         long elapsedTime;
502         while(!s.get()){
503             elapsedTime = expiryTime - System.currentTimeMillis();
504             if(elapsedTime < 1) {
505                 break;
506             }
507             Thread.sleep(10);
508         }
509         return s.get();
510     }
511
512
513     /**
514      * This class is used control a thread  executed in th {@link #test()}
515      */
516     @SuppressWarnings("unused")
517     private static class ExecuteTest {
518
519
520         /** A fail safe to insure this TEst does not run indefinitely */
521         private final long EXPIRY_TIME = System.currentTimeMillis() + 10000;
522
523
524
525         /** A thread sets this value to true when it has completed the execution the of executes {@link #test()}   */
526         private  volatile boolean isExecuted = false;
527
528         /**
529          * A thread sets this value to true when it is actively executing {@link #test()} and back to false when
530          * it is not
531          */
532         private  volatile boolean isExecuting = false;
533
534         /**
535          * While this value is true, a thread will not be allowed to return from {@link #test()} It will simulate  a
536          * long execution.
537          */
538         private  volatile boolean isContinuous = false;
539
540         /**
541          * When this value is set to true, an ongoing simulation of a long execution of {@link #test()} cannot be force
542          * to abort via a  {@link Thread#interrupt()}
543          */
544         private  volatile boolean isIgnoreInterrupt = false;
545
546
547
548         /** Use to send a signal to the thread executing {@link #notifyTestExcuted(long)} */
549         private  Semaphore inner = new Semaphore(0);
550
551         /** Use to send a signal to the thread executing {@link #waitForTestExec(long)} */
552         private  Semaphore outer = new Semaphore(0);
553
554         /** The {@link Future} of the Thread executing {@link #test()}*/
555         private volatile Future<?> future;
556
557         /**
558          * When set the Thread executing {@link #test()} will cancel itself
559          * @param future - The {@link Future} of the Thread executing {@link #test()}
560          */
561         private void cancelSelfOnNextExecution(Future<?> future) {
562             this.future = future;
563         }
564
565
566         private boolean isExecuted() {
567             return isExecuted;
568         }
569
570         private boolean isExecuting() {
571             return isExecuting;
572         }
573
574
575         private boolean isContinuous() {
576             return isContinuous;
577         }
578
579
580         private boolean isIgnoreInterrupt() {
581             return isIgnoreInterrupt;
582         }
583
584
585
586         /**
587          * The thread executing this method if blocked from returning until the thread executing
588          * {@link #test()}  invokes  {@link #notifyTestExcuted(long)} or the specified time elapses
589          * @param timeoutMillis - the amount of time to wait for a execution iteration.
590          * @return true if the Thread is released because of an invocation of {@link #notifyTestExcuted(long)}
591          * @throws InterruptedException - If the Caller thread is interrupted.
592          */
593         private boolean waitForTestExec(long timeoutMillis) throws InterruptedException {
594             inner.release();
595             return outer.tryAcquire(timeoutMillis,TimeUnit.MILLISECONDS);
596         }
597
598
599         /**
600          * Test simulator
601          * @return  Always returns true.
602          */
603         private Boolean test() {
604             isTestExpired();
605             System.out.println("started");
606              isExecuting = true;
607              try {
608                  if (future != null) {
609                      future.cancel(false);
610                  }
611                  if(!isContinuous){
612                      notifyTestExcuted(1);
613                  }
614
615                  while(isContinuous){
616                      notifyTestExcuted(100);
617                      isTestExpired();
618                  }
619
620              } finally {
621                  isExecuting = false;
622                  isExecuted = true;
623              }
624              return true;
625         }
626
627
628         /** @throws RuntimeException if the test  has bee running too long */
629         private void isTestExpired(){
630             if(System.currentTimeMillis() > EXPIRY_TIME){
631                 throw new RuntimeException("Something went wrong the test expired.");
632             }
633         }
634
635
636         /**
637          * The thread executing {@link #test()}  if blocked from returning until another thread invokes
638          * {@link #waitForTestExec(long)} or the specified time elapses
639          * @param timeoutMillis - the amount of time to wait for a execution iteration.
640          * @return true if the Thread is released because of an invocation of {@link #waitForTestExec(long)}
641          */
642         private boolean notifyTestExcuted(long timeoutMillis){
643             try {
644                 boolean acquire = inner.tryAcquire(timeoutMillis,TimeUnit.MILLISECONDS);
645                 if(acquire){
646                     outer.release();
647                     System.out.println("release");
648                 }
649             } catch (InterruptedException e) {
650                 if(!isIgnoreInterrupt){
651                     return false;
652                 }
653             }
654             return true;
655         }
656     }
657
658
659     static class Sub {
660         ExecuteTest et = new ExecuteTest();
661         Future<?> future = null;
662     }
663
664 }