Second part of onap rename
[appc.git] / appc-oam / appc-oam-bundle / src / test / java / org / onap / appc / oam / util / AsyncTaskHelperTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  * ============LICENSE_END=========================================================
23  */
24
25 package org.onap.appc.oam.util;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import org.junit.After;
29 import org.junit.Assert;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.junit.runner.RunWith;
33 import org.mockito.Mockito;
34 import org.onap.appc.oam.AppcOam;
35 import org.onap.appc.statemachine.impl.readers.AppcOamStates;
36 import org.osgi.framework.Bundle;
37 import org.osgi.framework.FrameworkUtil;
38 import org.powermock.api.mockito.PowerMockito;
39 import org.powermock.core.classloader.annotations.PrepareForTest;
40 import org.powermock.modules.junit4.PowerMockRunner;
41
42 import java.util.LinkedList;
43 import java.util.concurrent.Future;
44 import java.util.concurrent.Semaphore;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.TimeoutException;
47 import java.util.concurrent.atomic.AtomicReference;
48 import java.util.function.Supplier;
49
50 import static org.mockito.Matchers.any;
51 import static org.mockito.Mockito.mock;
52 import static org.powermock.api.mockito.PowerMockito.mockStatic;
53
54
55 @RunWith(PowerMockRunner.class)
56 @PrepareForTest({FrameworkUtil.class})
57 public class AsyncTaskHelperTest {
58     private AsyncTaskHelper asyncTaskHelper;
59
60     private long initialDelayMillis = 0;
61     private long delayMillis = 10;
62
63
64     @Before
65     public void setUp() throws Exception {
66
67         // to avoid operation on logger fail, mock up the logger
68         EELFLogger mockLogger = mock(EELFLogger.class);
69
70
71         mockStatic(FrameworkUtil.class);
72         Bundle myBundle = mock(Bundle.class);
73         Mockito.doReturn("TestBundle").when(myBundle).getSymbolicName();
74         PowerMockito.when(FrameworkUtil.getBundle(any())).thenReturn(myBundle);
75
76         asyncTaskHelper = new AsyncTaskHelper(mockLogger);
77
78
79     }
80
81
82     @After
83     public void shutdown(){
84         asyncTaskHelper.close();
85     }
86
87
88     /**
89      * Test that Base Runnable
90      *
91      * Runs at a fix rate;
92      * Only one Base Runnable  can be scheduled at time;
93      * Future.cancle stops the Base Runnable;
94      * That another Base Runnable  can be scheduled once the previous isDone.
95      */
96     @Test
97     public void test_scheduleBaseRunnable_Base_isDone() throws Exception{
98
99
100
101         //loop is to test we can run consecutive Base Runnable
102         for(int testIteration = 0; testIteration < 3;testIteration++){
103             final ExecuteTest et = new ExecuteTest();
104
105             Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
106                     et::test
107                     , s -> { }
108                     ,initialDelayMillis
109                     ,delayMillis
110             );
111
112             //make sure it is running at a fix rate
113             Assert.assertTrue("It should be iterating", et.waitForTestExec(5000));
114             Assert.assertFalse("It Should not be Done", future.isDone());
115             Assert.assertTrue("It should be iterating", et.waitForTestExec(5000));
116             Assert.assertFalse("It Should not be Done", future.isDone());
117
118
119             //make sure a seconds Runnable cannot be scheduled when one is already running
120             try {
121                 asyncTaskHelper.scheduleBaseRunnable(et::test
122                         , s -> {}
123                         ,initialDelayMillis
124                         ,delayMillis
125                 );
126                 Assert.fail("scheduling should have been prevented.  ");
127             } catch (IllegalStateException e) {
128                 //IllegalStateException means the second scheduling was not allowed.
129             }
130
131
132             //let it cancel itself
133             et.cancelSelfOnNextExecution(future);
134
135             //it should be done after it executes itself one more time.
136             Assert.assertTrue("it should be done", waitFor(future::isDone, 5000));
137             Assert.assertTrue("The test failed to execute", et.isExecuted);
138         }
139
140
141     }
142
143
144     /**
145      * Makes sure the Future.isDone one only returns true if its runnable is not currently executing and will not
146      * execute in the future.  Default implementation of isDone() returns true immediately after the future is
147      * canceled -- Even if is there is still a thread actively executing the runnable
148      */
149     @Test
150     public void test_scheduleBaseRunnable_Base_isDone_Ignore_Interrupt() throws Exception{
151
152
153         final ExecuteTest et = new ExecuteTest();
154
155         //configure test to run long and ignore interrupt
156         et.isContinuous = true;
157         et.isIgnoreInterrupt = true;
158
159
160
161         Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
162                 et::test
163                 , s->{}
164                 ,initialDelayMillis
165                 ,delayMillis
166         );
167
168         //make sure it is running
169         Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
170         Assert.assertTrue("It should be running",et.waitForTestExec(1000));
171         Assert.assertFalse("It Should not be Done", future.isDone());
172
173         //cancel it and make sure it is still running
174         future.cancel(true);
175         Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
176         Assert.assertTrue("It should be running",et.waitForTestExec(1000));
177         Assert.assertFalse("It Should not be Done", future.isDone());
178
179         //let the thread die and then make sure its done
180         et.isContinuous = false;
181         Assert.assertTrue("It should not be running",waitForNot(et::isExecuting,1000));
182         Assert.assertTrue("It Should be Done", future.isDone());
183
184     }
185
186
187
188
189     /**
190      * Make sure the base Future.isDone returns false until the sub callable has completed execution.
191      */
192     @Test
193     public void test_scheduleBaseRunnable_SubTask_isDone_Ignore_Interrupt() throws Exception{
194
195
196         final ExecuteTest baseET = new ExecuteTest();
197         final ExecuteTest subET = new ExecuteTest();
198
199         //configure sub test to run long and ignore interrupt
200         subET.isContinuous = true;
201         subET.isIgnoreInterrupt = true;
202
203
204         //schedule the Base test to run and make sure it is running.
205         Future<?> baseFuture = asyncTaskHelper.scheduleBaseRunnable(
206                 baseET::test
207                 ,s->{}
208                 ,initialDelayMillis
209                 ,delayMillis
210                 );
211         Assert.assertTrue("baseET should be running",waitFor(baseET::isExecuted,1000));
212         Assert.assertFalse("baseET Should not be Done because it runs at a fix rate", baseFuture.isDone());
213
214
215         //schedule the sub task and make sure it is  running
216         Future<?> subFuture = asyncTaskHelper.submitBaseSubCallable(subET::test);
217         Assert.assertTrue("subET should be running",waitFor(subET::isExecuting,1000));
218         Assert.assertTrue("subET should be running",subET.waitForTestExec(1000));
219         Assert.assertFalse("subET Should not be Done", subFuture.isDone());
220         Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
221
222         //cancel the base task and make sure isDone is still false
223         baseFuture.cancel(true);
224         Assert.assertTrue("subET should be running",waitFor(subET::isExecuting,1000));
225         Assert.assertTrue("subET should be running",subET.waitForTestExec(1000));
226         Assert.assertFalse("subET Should not be Done",subFuture.isDone());
227         Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
228
229
230         //let the sub task die and and make sure the base is now finally done
231         subET.isContinuous = false;
232         Assert.assertTrue("subET should not be running",waitForNot(subET::isExecuting,1000));
233         Assert.assertTrue("subET Should be Done", subFuture.isDone());
234         Assert.assertTrue("baseET Should be Done", baseFuture.isDone());
235
236     }
237
238
239     /**
240      * Make sure the base Future.isDone returns false until the 3 sub callable has completed execution.
241      * Each sub callable will be shutdown one at a time.
242      */
243     @Test
244     public void test_scheduleBaseRunnable_SubTasks_isDone() throws Exception {
245
246
247         //loop is to test we can run consecutive Base Runnable
248         for (int testIteration = 0; testIteration < 3; testIteration++) {
249             final ExecuteTest baseET = new ExecuteTest();
250             final LinkedList<Sub> subList = new LinkedList<>();
251             for (int i = 0; i < 3; i++) {
252                 Sub sub = new Sub();
253                 sub.et.isContinuous = true;
254                 subList.add(sub);
255             }
256
257
258             //schedule the base runnable and make sure it is running
259             Future<?> baseFuture = asyncTaskHelper.scheduleBaseRunnable(
260                     baseET::test
261                     , s -> {
262                     }
263                     , initialDelayMillis
264                     , delayMillis
265             );
266             Assert.assertTrue("baseET should be running", waitFor(baseET::isExecuted, 1000));
267             Assert.assertFalse("baseET Should not be Done because it runs at a fix rate", baseFuture.isDone());
268
269
270             //schedule the sub Callables and make sure these are running
271             subList.forEach(sub -> sub.future = asyncTaskHelper.submitBaseSubCallable(sub.et::test));
272             for (Sub sub : subList) {
273                 Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 100));
274                 Assert.assertTrue("subET should be running", sub.et.waitForTestExec(1000));
275                 Assert.assertFalse("subET Should not be Done", sub.future.isDone());
276             }
277             Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
278
279
280             //On each iteration shut down a sub callable.  Make sure it stops, the others are still running and the
281             // //base is still running.
282             while (!subList.isEmpty()) {
283
284                 //stop one sub and make sure it stopped
285                 {
286                     Sub sub = subList.removeFirst();
287                     Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 1000));
288                     sub.et.isContinuous = false;
289                     Assert.assertTrue("subET should not be running", waitForNot(sub.et::isExecuting,1000));
290                     Assert.assertTrue("subET Should not be Done", sub.future.isDone());
291                 }
292
293                 //make sure the other are still running
294                 for (Sub sub : subList) {
295                     Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 1000));
296                     Assert.assertTrue("subET should be running", sub.et.waitForTestExec(1000));
297                     Assert.assertFalse("subET Should not be Done", sub.future.isDone());
298                 }
299
300                 //Make sure the Base is still running
301                 Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
302             }
303
304             //let the base cancel itself and make sure it stops
305             baseET.cancelSelfOnNextExecution(baseFuture);
306             Assert.assertTrue("baseET should be done", waitFor(baseFuture::isDone, 1000));
307         }
308     }
309
310
311     /**
312      * Make sure SubCallable cannot be scheduled when there is not BaseRunnable
313      */
314     @Test(expected=IllegalStateException.class)
315     public void test_SubTasksScheduleFailWhenNoBase() throws Exception {
316         asyncTaskHelper.submitBaseSubCallable(()->null);
317     }
318
319
320
321     /**
322      * Make sure SubCallable cannot be scheduled when BaseRunnable is cancelled but is still actively running.
323      */
324     @Test(expected=IllegalStateException.class)
325     public void test_SubTasksScheduleFailWhenBaseCanceledBeforeisDone() throws Exception {
326
327         final ExecuteTest et = new ExecuteTest();
328         et.isContinuous = true;
329
330         Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
331                 et::test
332                 , s -> { }
333                 ,initialDelayMillis
334                 ,delayMillis
335         );
336
337         Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
338         future.cancel(false);
339         Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
340
341         try {
342             asyncTaskHelper.submitBaseSubCallable(() -> null);
343         } finally {
344             et.isContinuous = false;
345         }
346
347
348
349     }
350
351
352     /**
353      * Make sure SubCallable cannot be scheduled after a BaseRunnable has completed
354      */
355     @Test(expected=IllegalStateException.class)
356     public void test_SubTasksScheduleFailAfterBaseDone() throws Exception {
357
358         final ExecuteTest et = new ExecuteTest();
359
360         Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
361                 et::test
362                 , s -> { }
363                 ,initialDelayMillis
364                 ,delayMillis
365         );
366
367
368         future.cancel(false);
369         Assert.assertTrue("It should not be running",waitFor(future::isDone,1000));
370
371         try {
372             asyncTaskHelper.submitBaseSubCallable(() -> null);
373         } finally {
374             et.isContinuous = false;
375         }
376
377     }
378
379
380     /**
381      * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
382      * Test cancel does not block when BaseRunnable is not scheduled
383      */
384     @Test
385     public void test_cancel_noBlockingWhenBaseRunnableNotScheduled() throws Exception{
386         //nothing is running so this should return immediately without TimeoutException
387         asyncTaskHelper.cancelBaseActionRunnable(AppcOam.RPC.stop , AppcOamStates.Started , 1, TimeUnit.MILLISECONDS);
388     }
389
390
391
392     /**
393      * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
394      * Test cancel does blocks until BaseRunnable is done scheduled
395      */
396     @Test()
397     public void test_cancel_BlockingWhenBaseRunnableNotDone() throws Exception {
398
399
400         final ExecuteTest et = new ExecuteTest();
401         et.isContinuous = true;
402         et.isIgnoreInterrupt = true;
403         asyncTaskHelper.scheduleBaseRunnable(
404                 et::test
405                 , s -> {
406                 }
407                 , initialDelayMillis
408                 , delayMillis
409         );
410
411         Assert.assertTrue("It should be running", waitFor(et::isExecuting, 1000));
412
413
414         //we should get a timeout
415         try {
416             asyncTaskHelper.cancelBaseActionRunnable(
417                     AppcOam.RPC.stop,
418                     AppcOamStates.Started,
419                     1,
420                     TimeUnit.MILLISECONDS);
421             Assert.fail("Should have gotten TimeoutException");
422         } catch (TimeoutException e) {
423             //just ignore as it is expected
424         }
425
426
427         //release the test thread
428         et.isContinuous = false;
429
430
431         //we should not get a timeout
432         asyncTaskHelper.cancelBaseActionRunnable(
433                 AppcOam.RPC.stop,
434                 AppcOamStates.Started,
435                 1000,
436                 TimeUnit.MILLISECONDS);
437
438     }
439
440
441
442     /**
443      * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
444      * Test cancel does not block when BaseRunnable is not scheduled
445      */
446     @Test
447     public void test_BaseRunnableCancelCallback() throws Exception{
448
449         AtomicReference<AppcOam.RPC> cancelCallback = new AtomicReference<>(null);
450
451         final ExecuteTest et = new ExecuteTest();
452         et.isContinuous = true;
453         Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
454                 et::test
455                 , cancelCallback::set
456                 , initialDelayMillis
457                 , delayMillis
458         );
459
460         Assert.assertTrue("It should be running", waitFor(et::isExecuting, 1000));
461         Assert.assertTrue("It should be running", waitForNot(future::isDone, 1000));
462
463
464         try {
465             asyncTaskHelper.cancelBaseActionRunnable(
466                     AppcOam.RPC.stop,
467                     AppcOamStates.Started,
468                     1,
469                     TimeUnit.MILLISECONDS);
470             Assert.fail("Should have gotten TimeoutException");
471         } catch (TimeoutException e) {
472            //just ignore as it is expected
473         }
474
475
476         Assert.assertEquals("Unexpected rpc in call back",AppcOam.RPC.stop,cancelCallback.get());
477     }
478
479
480
481
482
483
484
485
486     /**
487      * @return true if the negation of the expected value is returned from the supplier within the specified
488      * amount of time
489      */
490     private static boolean waitForNot(Supplier<Boolean> s,long timeoutMillis)throws Exception{
491         return waitFor(()->!s.get(),timeoutMillis);
492     }
493
494
495     /**
496      * @return true if the expected value is returned from the supplier within the specified
497      * amount of time
498      */
499     private static boolean waitFor(Supplier<Boolean> s,long timeoutMillis) throws Exception {
500         long timeout = TimeUnit.MILLISECONDS.toMillis(timeoutMillis);
501         long expiryTime = System.currentTimeMillis() + timeout;
502         long elapsedTime;
503         while(!s.get()){
504             elapsedTime = expiryTime - System.currentTimeMillis();
505             if(elapsedTime < 1) {
506                 break;
507             }
508             Thread.sleep(10);
509         }
510         return s.get();
511     }
512
513
514     /**
515      * This class is used control a thread  executed in th {@link #test()}
516      */
517     @SuppressWarnings("unused")
518     private static class ExecuteTest {
519
520
521         /** A fail safe to insure this TEst does not run indefinitely */
522         private final long EXPIRY_TIME = System.currentTimeMillis() + 10000;
523
524
525
526         /** A thread sets this value to true when it has completed the execution the of executes {@link #test()}   */
527         private  volatile boolean isExecuted = false;
528
529         /**
530          * A thread sets this value to true when it is actively executing {@link #test()} and back to false when
531          * it is not
532          */
533         private  volatile boolean isExecuting = false;
534
535         /**
536          * While this value is true, a thread will not be allowed to return from {@link #test()} It will simulate  a
537          * long execution.
538          */
539         private  volatile boolean isContinuous = false;
540
541         /**
542          * When this value is set to true, an ongoing simulation of a long execution of {@link #test()} cannot be force
543          * to abort via a  {@link Thread#interrupt()}
544          */
545         private  volatile boolean isIgnoreInterrupt = false;
546
547
548
549         /** Use to send a signal to the thread executing {@link #notifyTestExcuted(long)} */
550         private  Semaphore inner = new Semaphore(0);
551
552         /** Use to send a signal to the thread executing {@link #waitForTestExec(long)} */
553         private  Semaphore outer = new Semaphore(0);
554
555         /** The {@link Future} of the Thread executing {@link #test()}*/
556         private volatile Future<?> future;
557
558         /**
559          * When set the Thread executing {@link #test()} will cancel itself
560          * @param future - The {@link Future} of the Thread executing {@link #test()}
561          */
562         private void cancelSelfOnNextExecution(Future<?> future) {
563             this.future = future;
564         }
565
566
567         private boolean isExecuted() {
568             return isExecuted;
569         }
570
571         private boolean isExecuting() {
572             return isExecuting;
573         }
574
575
576         private boolean isContinuous() {
577             return isContinuous;
578         }
579
580
581         private boolean isIgnoreInterrupt() {
582             return isIgnoreInterrupt;
583         }
584
585
586
587         /**
588          * The thread executing this method if blocked from returning until the thread executing
589          * {@link #test()}  invokes  {@link #notifyTestExcuted(long)} or the specified time elapses
590          * @param timeoutMillis - the amount of time to wait for a execution iteration.
591          * @return true if the Thread is released because of an invocation of {@link #notifyTestExcuted(long)}
592          * @throws InterruptedException - If the Caller thread is interrupted.
593          */
594         private boolean waitForTestExec(long timeoutMillis) throws InterruptedException {
595             inner.release();
596             return outer.tryAcquire(timeoutMillis,TimeUnit.MILLISECONDS);
597         }
598
599
600         /**
601          * Test simulator
602          * @return  Always returns true.
603          */
604         private Boolean test() {
605             isTestExpired();
606             System.out.println("started");
607              isExecuting = true;
608              try {
609                  if (future != null) {
610                      future.cancel(false);
611                  }
612                  if(!isContinuous){
613                      notifyTestExcuted(1);
614                  }
615
616                  while(isContinuous){
617                      notifyTestExcuted(100);
618                      isTestExpired();
619                  }
620
621              } finally {
622                  isExecuting = false;
623                  isExecuted = true;
624              }
625              return true;
626         }
627
628
629         /** @throws RuntimeException if the test  has bee running too long */
630         private void isTestExpired(){
631             if(System.currentTimeMillis() > EXPIRY_TIME){
632                 throw new RuntimeException("Something went wrong the test expired.");
633             }
634         }
635
636
637         /**
638          * The thread executing {@link #test()}  if blocked from returning until another thread invokes
639          * {@link #waitForTestExec(long)} or the specified time elapses
640          * @param timeoutMillis - the amount of time to wait for a execution iteration.
641          * @return true if the Thread is released because of an invocation of {@link #waitForTestExec(long)}
642          */
643         private boolean notifyTestExcuted(long timeoutMillis){
644             try {
645                 boolean acquire = inner.tryAcquire(timeoutMillis,TimeUnit.MILLISECONDS);
646                 if(acquire){
647                     outer.release();
648                     System.out.println("release");
649                 }
650             } catch (InterruptedException e) {
651                 if(!isIgnoreInterrupt){
652                     return false;
653                 }
654             }
655             return true;
656         }
657     }
658
659
660     static class Sub {
661         ExecuteTest et = new ExecuteTest();
662         Future<?> future = null;
663     }
664
665 }