Changed to unmaintained
[appc.git] / appc-oam / appc-oam-bundle / src / test / java / org / onap / appc / oam / util / AsyncTaskHelperTest.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.appc.oam.util;
25
26 import com.att.eelf.configuration.EELFLogger;
27 import org.junit.After;
28 import org.junit.Assert;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.junit.runner.RunWith;
32 import org.mockito.Mockito;
33 import org.onap.appc.oam.AppcOam;
34 import org.onap.appc.statemachine.impl.readers.AppcOamStates;
35 import org.osgi.framework.Bundle;
36 import org.osgi.framework.FrameworkUtil;
37 import org.powermock.api.mockito.PowerMockito;
38 import org.powermock.core.classloader.annotations.PrepareForTest;
39 import org.powermock.modules.junit4.PowerMockRunner;
40
41 import java.util.LinkedList;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.Semaphore;
44 import java.util.concurrent.TimeUnit;
45 import java.util.concurrent.TimeoutException;
46 import java.util.concurrent.atomic.AtomicReference;
47 import java.util.function.Supplier;
48
49 import static org.mockito.Matchers.any;
50 import static org.mockito.Mockito.mock;
51 import static org.powermock.api.mockito.PowerMockito.mockStatic;
52
53
54 @RunWith(PowerMockRunner.class)
55 @PrepareForTest({FrameworkUtil.class})
56 public class AsyncTaskHelperTest {
57     private AsyncTaskHelper asyncTaskHelper;
58
59     private long initialDelayMillis = 0;
60     private long delayMillis = 10;
61
62
63     @Before
64     public void setUp() throws Exception {
65
66         // to avoid operation on logger fail, mock up the logger
67         EELFLogger mockLogger = mock(EELFLogger.class);
68
69
70         mockStatic(FrameworkUtil.class);
71         Bundle myBundle = mock(Bundle.class);
72         Mockito.doReturn("TestBundle").when(myBundle).getSymbolicName();
73         PowerMockito.when(FrameworkUtil.getBundle(any())).thenReturn(myBundle);
74
75         asyncTaskHelper = new AsyncTaskHelper(mockLogger);
76
77
78     }
79
80
81     @After
82     public void shutdown(){
83         asyncTaskHelper.close();
84     }
85
86
87     /**
88      * Test that Base Runnable
89      *
90      * Runs at a fix rate;
91      * Only one Base Runnable  can be scheduled at time;
92      * Future.cancle stops the Base Runnable;
93      * That another Base Runnable  can be scheduled once the previous isDone.
94      */
95     @Test
96     public void test_scheduleBaseRunnable_Base_isDone() throws Exception{
97
98
99
100         //loop is to test we can run consecutive Base Runnable
101         for(int testIteration = 0; testIteration < 3;testIteration++){
102             final ExecuteTest et = new ExecuteTest();
103
104             Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
105                     et::test
106                     , s -> { }
107                     ,initialDelayMillis
108                     ,delayMillis
109             );
110
111             //make sure it is running at a fix rate
112             Assert.assertTrue("It should be iterating", et.waitForTestExec(5000));
113             Assert.assertFalse("It Should not be Done", future.isDone());
114             Assert.assertTrue("It should be iterating", et.waitForTestExec(5000));
115             Assert.assertFalse("It Should not be Done", future.isDone());
116
117
118             //make sure a seconds Runnable cannot be scheduled when one is already running
119             try {
120                 asyncTaskHelper.scheduleBaseRunnable(et::test
121                         , s -> {}
122                         ,initialDelayMillis
123                         ,delayMillis
124                 );
125                 Assert.fail("scheduling should have been prevented.  ");
126             } catch (IllegalStateException e) {
127                 //IllegalStateException means the second scheduling was not allowed.
128             }
129
130
131             //let it cancel itself
132             et.cancelSelfOnNextExecution(future);
133
134             //it should be done after it executes itself one more time.
135             Assert.assertTrue("it should be done", waitFor(future::isDone, 5000));
136             Assert.assertTrue("The test failed to execute", et.isExecuted);
137         }
138
139
140     }
141
142
143     /**
144      * Makes sure the Future.isDone one only returns true if its runnable is not currently executing and will not
145      * execute in the future.  Default implementation of isDone() returns true immediately after the future is
146      * canceled -- Even if is there is still a thread actively executing the runnable
147      */
148     @Test
149     public void test_scheduleBaseRunnable_Base_isDone_Ignore_Interrupt() throws Exception{
150
151
152         final ExecuteTest et = new ExecuteTest();
153
154         //configure test to run long and ignore interrupt
155         et.isContinuous = true;
156         et.isIgnoreInterrupt = true;
157
158
159
160         Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
161                 et::test
162                 , s->{}
163                 ,initialDelayMillis
164                 ,delayMillis
165         );
166
167         //make sure it is running
168         Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
169         Assert.assertTrue("It should be running",et.waitForTestExec(1000));
170         Assert.assertFalse("It Should not be Done", future.isDone());
171
172         //cancel it and make sure it is still running
173         future.cancel(true);
174         Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
175         Assert.assertTrue("It should be running",et.waitForTestExec(1000));
176         Assert.assertFalse("It Should not be Done", future.isDone());
177
178         //let the thread die and then make sure its done
179         et.isContinuous = false;
180         Assert.assertTrue("It should not be running",waitForNot(et::isExecuting,1000));
181         Assert.assertTrue("It Should be Done", future.isDone());
182
183     }
184
185
186
187
188     /**
189      * Make sure the base Future.isDone returns false until the sub callable has completed execution.
190      */
191     @Test
192     public void test_scheduleBaseRunnable_SubTask_isDone_Ignore_Interrupt() throws Exception{
193
194
195         final ExecuteTest baseET = new ExecuteTest();
196         final ExecuteTest subET = new ExecuteTest();
197
198         //configure sub test to run long and ignore interrupt
199         subET.isContinuous = true;
200         subET.isIgnoreInterrupt = true;
201
202
203         //schedule the Base test to run and make sure it is running.
204         Future<?> baseFuture = asyncTaskHelper.scheduleBaseRunnable(
205                 baseET::test
206                 ,s->{}
207                 ,initialDelayMillis
208                 ,delayMillis
209                 );
210         Assert.assertTrue("baseET should be running",waitFor(baseET::isExecuted,1000));
211         Assert.assertFalse("baseET Should not be Done because it runs at a fix rate", baseFuture.isDone());
212
213
214         //schedule the sub task and make sure it is  running
215         Future<?> subFuture = asyncTaskHelper.submitBaseSubCallable(subET::test);
216         Assert.assertTrue("subET should be running",waitFor(subET::isExecuting,1000));
217         Assert.assertTrue("subET should be running",subET.waitForTestExec(1000));
218         Assert.assertFalse("subET Should not be Done", subFuture.isDone());
219         Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
220
221         //cancel the base task and make sure isDone is still false
222         baseFuture.cancel(true);
223         Assert.assertTrue("subET should be running",waitFor(subET::isExecuting,1000));
224         Assert.assertTrue("subET should be running",subET.waitForTestExec(1000));
225         Assert.assertFalse("subET Should not be Done",subFuture.isDone());
226         Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
227
228
229         //let the sub task die and and make sure the base is now finally done
230         subET.isContinuous = false;
231         Assert.assertTrue("subET should not be running",waitForNot(subET::isExecuting,1000));
232         Assert.assertTrue("subET Should be Done", subFuture.isDone());
233         Assert.assertTrue("baseET Should be Done", baseFuture.isDone());
234
235     }
236
237
238     /**
239      * Make sure the base Future.isDone returns false until the 3 sub callable has completed execution.
240      * Each sub callable will be shutdown one at a time.
241      */
242     public void test_scheduleBaseRunnable_SubTasks_isDone() throws Exception {
243
244
245         //loop is to test we can run consecutive Base Runnable
246         for (int testIteration = 0; testIteration < 3; testIteration++) {
247             final ExecuteTest baseET = new ExecuteTest();
248             final LinkedList<Sub> subList = new LinkedList<>();
249             for (int i = 0; i < 3; i++) {
250                 Sub sub = new Sub();
251                 sub.et.isContinuous = true;
252                 subList.add(sub);
253             }
254
255
256             //schedule the base runnable and make sure it is running
257             Future<?> baseFuture = asyncTaskHelper.scheduleBaseRunnable(
258                     baseET::test
259                     , s -> {
260                     }
261                     , initialDelayMillis
262                     , delayMillis
263             );
264             Assert.assertTrue("baseET should be running", waitFor(baseET::isExecuted, 1000));
265             Assert.assertFalse("baseET Should not be Done because it runs at a fix rate", baseFuture.isDone());
266
267
268             //schedule the sub Callables and make sure these are running
269             subList.forEach(sub -> sub.future = asyncTaskHelper.submitBaseSubCallable(sub.et::test));
270             for (Sub sub : subList) {
271                 Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 100));
272                 Assert.assertTrue("subET should be running", sub.et.waitForTestExec(1000));
273                 Assert.assertFalse("subET Should not be Done", sub.future.isDone());
274             }
275             Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
276
277
278             //On each iteration shut down a sub callable.  Make sure it stops, the others are still running and the
279             // //base is still running.
280             while (!subList.isEmpty()) {
281
282                 //stop one sub and make sure it stopped
283                 {
284                     Sub sub = subList.removeFirst();
285                     Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 1000));
286                     sub.et.isContinuous = false;
287                     Assert.assertTrue("subET should not be running", waitForNot(sub.et::isExecuting,1000));
288                     Assert.assertTrue("subET Should not be Done", sub.future.isDone());
289                 }
290
291                 //make sure the other are still running
292                 for (Sub sub : subList) {
293                     Assert.assertTrue("subET should be running", waitFor(sub.et::isExecuting, 1000));
294                     Assert.assertTrue("subET should be running", sub.et.waitForTestExec(1000));
295                     Assert.assertFalse("subET Should not be Done", sub.future.isDone());
296                 }
297
298                 //Make sure the Base is still running
299                 Assert.assertFalse("baseET Should not be Done", baseFuture.isDone());
300             }
301
302             //let the base cancel itself and make sure it stops
303             baseET.cancelSelfOnNextExecution(baseFuture);
304             Assert.assertTrue("baseET should be done", waitFor(baseFuture::isDone, 1000));
305         }
306     }
307
308
309     /**
310      * Make sure SubCallable cannot be scheduled when there is not BaseRunnable
311      */
312     @Test(expected=IllegalStateException.class)
313     public void test_SubTasksScheduleFailWhenNoBase() throws Exception {
314         asyncTaskHelper.submitBaseSubCallable(()->null);
315     }
316
317
318
319     /**
320      * Make sure SubCallable cannot be scheduled when BaseRunnable is cancelled but is still actively running.
321      */
322     @Test(expected=IllegalStateException.class)
323     public void test_SubTasksScheduleFailWhenBaseCanceledBeforeisDone() throws Exception {
324
325         final ExecuteTest et = new ExecuteTest();
326         et.isContinuous = true;
327
328         Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
329                 et::test
330                 , s -> { }
331                 ,initialDelayMillis
332                 ,delayMillis
333         );
334
335         Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
336         future.cancel(false);
337         Assert.assertTrue("It should be running",waitFor(et::isExecuting,1000));
338
339         try {
340             asyncTaskHelper.submitBaseSubCallable(() -> null);
341         } finally {
342             et.isContinuous = false;
343         }
344
345
346
347     }
348
349
350     /**
351      * Make sure SubCallable cannot be scheduled after a BaseRunnable has completed
352      */
353     @Test(expected=IllegalStateException.class)
354     public void test_SubTasksScheduleFailAfterBaseDone() throws Exception {
355
356         final ExecuteTest et = new ExecuteTest();
357
358         Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
359                 et::test
360                 , s -> { }
361                 ,initialDelayMillis
362                 ,delayMillis
363         );
364
365
366         future.cancel(false);
367         Assert.assertTrue("It should not be running",waitFor(future::isDone,1000));
368
369         try {
370             asyncTaskHelper.submitBaseSubCallable(() -> null);
371         } finally {
372             et.isContinuous = false;
373         }
374
375     }
376
377
378     /**
379      * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
380      * Test cancel does not block when BaseRunnable is not scheduled
381      */
382     @Test
383     public void test_cancel_noBlockingWhenBaseRunnableNotScheduled() throws Exception{
384         //nothing is running so this should return immediately without TimeoutException
385         asyncTaskHelper.cancelBaseActionRunnable(AppcOam.RPC.stop , AppcOamStates.Started , 1, TimeUnit.MILLISECONDS);
386     }
387
388
389
390     /**
391      * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
392      * Test cancel does blocks until BaseRunnable is done scheduled
393      */
394     @Test()
395     public void test_cancel_BlockingWhenBaseRunnableNotDone() throws Exception {
396
397
398         final ExecuteTest et = new ExecuteTest();
399         et.isContinuous = true;
400         et.isIgnoreInterrupt = true;
401         asyncTaskHelper.scheduleBaseRunnable(
402                 et::test
403                 , s -> {
404                 }
405                 , initialDelayMillis
406                 , delayMillis
407         );
408
409         Assert.assertTrue("It should be running", waitFor(et::isExecuting, 1000));
410
411
412         //we should get a timeout
413         try {
414             asyncTaskHelper.cancelBaseActionRunnable(
415                     AppcOam.RPC.stop,
416                     AppcOamStates.Started,
417                     1,
418                     TimeUnit.MILLISECONDS);
419             Assert.fail("Should have gotten TimeoutException");
420         } catch (TimeoutException e) {
421             //just ignore as it is expected
422         }
423
424
425         //release the test thread
426         et.isContinuous = false;
427
428
429         //we should not get a timeout
430         asyncTaskHelper.cancelBaseActionRunnable(
431                 AppcOam.RPC.stop,
432                 AppcOamStates.Started,
433                 1000,
434                 TimeUnit.MILLISECONDS);
435
436     }
437
438
439
440     /**
441      * Test {@link AsyncTaskHelper#cancelBaseActionRunnable(AppcOam.RPC, AppcOamStates, long, TimeUnit)}}
442      * Test cancel does not block when BaseRunnable is not scheduled
443      */
444     @Test
445     public void test_BaseRunnableCancelCallback() throws Exception{
446
447         AtomicReference<AppcOam.RPC> cancelCallback = new AtomicReference<>(null);
448
449         final ExecuteTest et = new ExecuteTest();
450         et.isContinuous = true;
451         Future<?> future = asyncTaskHelper.scheduleBaseRunnable(
452                 et::test
453                 , cancelCallback::set
454                 , initialDelayMillis
455                 , delayMillis
456         );
457
458         Assert.assertTrue("It should be running", waitFor(et::isExecuting, 1000));
459         Assert.assertTrue("It should be running", waitForNot(future::isDone, 1000));
460
461
462         try {
463             asyncTaskHelper.cancelBaseActionRunnable(
464                     AppcOam.RPC.stop,
465                     AppcOamStates.Started,
466                     1,
467                     TimeUnit.MILLISECONDS);
468             Assert.fail("Should have gotten TimeoutException");
469         } catch (TimeoutException e) {
470            //just ignore as it is expected
471         }
472
473
474         Assert.assertEquals("Unexpected rpc in call back",AppcOam.RPC.stop,cancelCallback.get());
475     }
476
477
478
479
480
481
482
483
484     /**
485      * @return true if the negation of the expected value is returned from the supplier within the specified
486      * amount of time
487      */
488     private static boolean waitForNot(Supplier<Boolean> s,long timeoutMillis)throws Exception{
489         return waitFor(()->!s.get(),timeoutMillis);
490     }
491
492
493     /**
494      * @return true if the expected value is returned from the supplier within the specified
495      * amount of time
496      */
497     private static boolean waitFor(Supplier<Boolean> s,long timeoutMillis) throws Exception {
498         long timeout = TimeUnit.MILLISECONDS.toMillis(timeoutMillis);
499         long expiryTime = System.currentTimeMillis() + timeout;
500         long elapsedTime;
501         while(!s.get()){
502             elapsedTime = expiryTime - System.currentTimeMillis();
503             if(elapsedTime < 1) {
504                 break;
505             }
506             Thread.sleep(10);
507         }
508         return s.get();
509     }
510
511
512     /**
513      * This class is used control a thread  executed in th {@link #test()}
514      */
515     @SuppressWarnings("unused")
516     private static class ExecuteTest {
517
518
519         /** A fail safe to insure this TEst does not run indefinitely */
520         private final long EXPIRY_TIME = System.currentTimeMillis() + 10000;
521
522
523
524         /** A thread sets this value to true when it has completed the execution the of executes {@link #test()}   */
525         private  volatile boolean isExecuted = false;
526
527         /**
528          * A thread sets this value to true when it is actively executing {@link #test()} and back to false when
529          * it is not
530          */
531         private  volatile boolean isExecuting = false;
532
533         /**
534          * While this value is true, a thread will not be allowed to return from {@link #test()} It will simulate  a
535          * long execution.
536          */
537         private  volatile boolean isContinuous = false;
538
539         /**
540          * When this value is set to true, an ongoing simulation of a long execution of {@link #test()} cannot be force
541          * to abort via a  {@link Thread#interrupt()}
542          */
543         private  volatile boolean isIgnoreInterrupt = false;
544
545
546
547         /** Use to send a signal to the thread executing {@link #notifyTestExcuted(long)} */
548         private  Semaphore inner = new Semaphore(0);
549
550         /** Use to send a signal to the thread executing {@link #waitForTestExec(long)} */
551         private  Semaphore outer = new Semaphore(0);
552
553         /** The {@link Future} of the Thread executing {@link #test()}*/
554         private volatile Future<?> future;
555
556         /**
557          * When set the Thread executing {@link #test()} will cancel itself
558          * @param future - The {@link Future} of the Thread executing {@link #test()}
559          */
560         private void cancelSelfOnNextExecution(Future<?> future) {
561             this.future = future;
562         }
563
564
565         private boolean isExecuted() {
566             return isExecuted;
567         }
568
569         private boolean isExecuting() {
570             return isExecuting;
571         }
572
573
574         private boolean isContinuous() {
575             return isContinuous;
576         }
577
578
579         private boolean isIgnoreInterrupt() {
580             return isIgnoreInterrupt;
581         }
582
583
584
585         /**
586          * The thread executing this method if blocked from returning until the thread executing
587          * {@link #test()}  invokes  {@link #notifyTestExcuted(long)} or the specified time elapses
588          * @param timeoutMillis - the amount of time to wait for a execution iteration.
589          * @return true if the Thread is released because of an invocation of {@link #notifyTestExcuted(long)}
590          * @throws InterruptedException - If the Caller thread is interrupted.
591          */
592         private boolean waitForTestExec(long timeoutMillis) throws InterruptedException {
593             inner.release();
594             return outer.tryAcquire(timeoutMillis,TimeUnit.MILLISECONDS);
595         }
596
597
598         /**
599          * Test simulator
600          * @return  Always returns true.
601          */
602         private Boolean test() {
603             isTestExpired();
604             System.out.println("started");
605              isExecuting = true;
606              try {
607                  if (future != null) {
608                      future.cancel(false);
609                  }
610                  if(!isContinuous){
611                      notifyTestExcuted(1);
612                  }
613
614                  while(isContinuous){
615                      notifyTestExcuted(100);
616                      isTestExpired();
617                  }
618
619              } finally {
620                  isExecuting = false;
621                  isExecuted = true;
622              }
623              return true;
624         }
625
626
627         /** @throws RuntimeException if the test  has bee running too long */
628         private void isTestExpired(){
629             if(System.currentTimeMillis() > EXPIRY_TIME){
630                 throw new RuntimeException("Something went wrong the test expired.");
631             }
632         }
633
634
635         /**
636          * The thread executing {@link #test()}  if blocked from returning until another thread invokes
637          * {@link #waitForTestExec(long)} or the specified time elapses
638          * @param timeoutMillis - the amount of time to wait for a execution iteration.
639          * @return true if the Thread is released because of an invocation of {@link #waitForTestExec(long)}
640          */
641         private boolean notifyTestExcuted(long timeoutMillis){
642             try {
643                 boolean acquire = inner.tryAcquire(timeoutMillis,TimeUnit.MILLISECONDS);
644                 if(acquire){
645                     outer.release();
646                     System.out.println("release");
647                 }
648             } catch (InterruptedException e) {
649                 if(!isIgnoreInterrupt){
650                     return false;
651                 }
652             }
653             return true;
654         }
655     }
656
657
658     static class Sub {
659         ExecuteTest et = new ExecuteTest();
660         Future<?> future = null;
661     }
662
663 }