579d53cc3f12b93ad049f36b1e79a38cd1faf9f5
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019-2022 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023-2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.distributed.locking;
23
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
27 import static org.assertj.core.api.Assertions.assertThatThrownBy;
28 import static org.junit.jupiter.api.Assertions.assertEquals;
29 import static org.junit.jupiter.api.Assertions.assertFalse;
30 import static org.junit.jupiter.api.Assertions.assertNotNull;
31 import static org.junit.jupiter.api.Assertions.assertNotSame;
32 import static org.junit.jupiter.api.Assertions.assertNull;
33 import static org.junit.jupiter.api.Assertions.assertSame;
34 import static org.junit.jupiter.api.Assertions.assertTrue;
35 import static org.mockito.ArgumentMatchers.any;
36 import static org.mockito.ArgumentMatchers.anyBoolean;
37 import static org.mockito.ArgumentMatchers.anyLong;
38 import static org.mockito.ArgumentMatchers.eq;
39 import static org.mockito.Mockito.doThrow;
40 import static org.mockito.Mockito.lenient;
41 import static org.mockito.Mockito.mock;
42 import static org.mockito.Mockito.never;
43 import static org.mockito.Mockito.times;
44 import static org.mockito.Mockito.verify;
45 import static org.mockito.Mockito.when;
46
47 import java.io.ByteArrayInputStream;
48 import java.io.ByteArrayOutputStream;
49 import java.io.ObjectInputStream;
50 import java.io.ObjectOutputStream;
51 import java.sql.Connection;
52 import java.sql.DriverManager;
53 import java.sql.PreparedStatement;
54 import java.sql.ResultSet;
55 import java.sql.SQLException;
56 import java.sql.SQLTransientException;
57 import java.util.ArrayList;
58 import java.util.List;
59 import java.util.Properties;
60 import java.util.concurrent.Executors;
61 import java.util.concurrent.RejectedExecutionException;
62 import java.util.concurrent.ScheduledExecutorService;
63 import java.util.concurrent.ScheduledFuture;
64 import java.util.concurrent.Semaphore;
65 import java.util.concurrent.TimeUnit;
66 import java.util.concurrent.atomic.AtomicBoolean;
67 import java.util.concurrent.atomic.AtomicInteger;
68 import java.util.concurrent.atomic.AtomicReference;
69 import org.apache.commons.dbcp2.BasicDataSource;
70 import org.junit.jupiter.api.AfterAll;
71 import org.junit.jupiter.api.AfterEach;
72 import org.junit.jupiter.api.BeforeAll;
73 import org.junit.jupiter.api.BeforeEach;
74 import org.junit.jupiter.api.Test;
75 import org.junit.jupiter.api.extension.ExtendWith;
76 import org.kie.api.runtime.KieSession;
77 import org.mockito.ArgumentCaptor;
78 import org.mockito.Mock;
79 import org.mockito.MockitoAnnotations;
80 import org.mockito.junit.jupiter.MockitoExtension;
81 import org.onap.policy.common.utils.services.OrderedServiceImpl;
82 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
83 import org.onap.policy.drools.core.PolicySession;
84 import org.onap.policy.drools.core.lock.Lock;
85 import org.onap.policy.drools.core.lock.LockCallback;
86 import org.onap.policy.drools.core.lock.LockState;
87 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
88 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
89 import org.onap.policy.drools.system.PolicyEngine;
90 import org.onap.policy.drools.system.PolicyEngineConstants;
91 import org.springframework.test.util.ReflectionTestUtils;
92
93 @ExtendWith(MockitoExtension.class)
94 class DistributedLockManagerTest {
95     private static final long EXPIRE_SEC = 900L;
96     private static final long RETRY_SEC = 60L;
97     private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
98     private static final String OTHER_HOST = "other-host";
99     private static final String OTHER_OWNER = "other-owner";
100     private static final String EXPECTED_EXCEPTION = "expected exception";
101     private static final String DB_CONNECTION =
102         "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
103     private static final String DB_USER = "user";
104     private static final String DB_PASSWORD = "password";
105     private static final String OWNER_KEY = "my key";
106     private static final String RESOURCE = "my resource";
107     private static final String RESOURCE2 = "my resource #2";
108     private static final String RESOURCE3 = "my resource #3";
109     private static final String RESOURCE4 = "my resource #4";
110     private static final String RESOURCE5 = "my resource #5";
111     private static final int HOLD_SEC = 100;
112     private static final int HOLD_SEC2 = 120;
113     private static final int MAX_THREADS = 5;
114     private static final int MAX_LOOPS = 100;
115     private static final boolean TRANSIENT = true;
116     private static final boolean PERMANENT = false;
117
118     // number of execute() calls before the first lock attempt
119     private static final int PRE_LOCK_EXECS = 1;
120
121     // number of execute() calls before the first schedule attempt
122     private static final int PRE_SCHED_EXECS = 1;
123
124     private static Connection conn = null;
125     private static ScheduledExecutorService saveExec;
126     private static ScheduledExecutorService realExec;
127
128     @Mock
129     private PolicyEngine engine;
130
131     @Mock
132     private KieSession kieSess;
133
134     @Mock
135     private ScheduledExecutorService exsvc;
136
137     @Mock
138     private ScheduledFuture<?> checker;
139
140     @Mock
141     private LockCallback callback;
142
143     @Mock
144     private BasicDataSource dataSource;
145
146     private DistributedLock lock;
147
148     private AtomicInteger numActive;
149     private AtomicInteger numSuccess;
150     private DistributedLockManager feature;
151
152     AutoCloseable closeable;
153
154     /**
155      * Configures the location of the property files and creates the DB.
156      *
157      * @throws SQLException if the DB cannot be created
158      */
159     @BeforeAll
160     static void setUpBeforeClass() throws SQLException {
161         SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
162         PolicyEngineConstants.getManager().configure(new Properties());
163
164         conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
165
166         try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
167             + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
168             + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
169             createStmt.executeUpdate();
170         }
171
172         saveExec = (ScheduledExecutorService) ReflectionTestUtils.getField(PolicyEngineConstants.getManager(),
173             POLICY_ENGINE_EXECUTOR_FIELD);
174
175         realExec = Executors.newScheduledThreadPool(3);
176     }
177
178     /**
179      * Restores static fields.
180      */
181     @AfterAll
182     static void tearDownAfterClass() throws SQLException {
183         ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
184         realExec.shutdown();
185         conn.close();
186     }
187
188     /**
189      * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
190      * tasks.
191      *
192      * @throws SQLException if the lock records cannot be deleted from the DB
193      */
194     @BeforeEach
195     void setUp() throws SQLException {
196         closeable = MockitoAnnotations.openMocks(this);
197         // grant() and deny() calls will come through here and be immediately executed
198         PolicySession session = new PolicySession(null, null, kieSess) {
199             @Override
200             public void insertDrools(Object object) {
201                 ((Runnable) object).run();
202             }
203         };
204
205         session.setPolicySession();
206
207         numActive = new AtomicInteger(0);
208         numSuccess = new AtomicInteger(0);
209
210         cleanDb();
211
212         feature = new MyLockingFeature(true);
213     }
214
215     @AfterEach
216     void tearDown() throws Exception {
217         shutdownFeature();
218         cleanDb();
219         closeable.close();
220     }
221
222     private void cleanDb() throws SQLException {
223         try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
224             stmt.executeUpdate();
225         }
226     }
227
228     private void shutdownFeature() {
229         if (feature != null) {
230             feature.afterStop(engine);
231             feature = null;
232         }
233     }
234
235     /**
236      * Tests that the feature is found in the expected service sets.
237      */
238     @Test
239     void testServiceApis() {
240         assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
241             .anyMatch(obj -> obj instanceof DistributedLockManager));
242     }
243
244     @Test
245     void testGetSequenceNumber() {
246         assertEquals(1000, feature.getSequenceNumber());
247     }
248
249     @Test
250     void testBeforeCreateLockManager() {
251         assertSame(feature, feature.beforeCreateLockManager());
252     }
253
254     /**
255      * Tests beforeCreate(), when getProperties() throws a runtime exception.
256      */
257     @Test
258     void testBeforeCreateLockManagerEx() {
259         shutdownFeature();
260
261         feature = new MyLockingFeature(false) {
262             @Override
263             protected Properties getProperties() {
264                 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
265             }
266         };
267
268         assertThatThrownBy(() -> feature.beforeCreateLockManager())
269             .isInstanceOf(DistributedLockManagerException.class);
270     }
271
272     @Test
273     void testAfterStart() {
274         // verify that cleanup & expire check are both added to the queue
275         verify(exsvc).execute(any());
276         verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
277     }
278
279     /**
280      * Tests afterStart(), when thread pool throws a runtime exception.
281      */
282     @Test
283     void testAfterStartExInThreadPool() {
284         shutdownFeature();
285
286         feature = new MyLockingFeature(false);
287
288         doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
289
290         assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
291     }
292
293     @Test
294     void testDeleteExpiredDbLocks() throws SQLException {
295         // add records: two expired, one not
296         insertRecord(RESOURCE, feature.getUuidString(), -1);
297         insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
298         insertRecord(RESOURCE3, OTHER_OWNER, 0);
299         insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
300
301         // get the clean-up function and execute it
302         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
303         verify(exsvc).execute(captor.capture());
304
305         long tbegin = System.currentTimeMillis();
306         Runnable action = captor.getValue();
307         action.run();
308
309         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
310         assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
311         assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
312         assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
313
314         assertEquals(2, getRecordCount());
315     }
316
317     /**
318      * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
319      */
320     @Test
321     void testDeleteExpiredDbLocksEx() {
322         feature = new InvalidDbLockingFeature(TRANSIENT);
323
324         // get the clean-up function and execute it
325         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
326         verify(exsvc).execute(captor.capture());
327
328         Runnable action = captor.getValue();
329
330         // should not throw an exception
331         action.run();
332     }
333
334     @Test
335     void testAfterStop() {
336         shutdownFeature();
337         verify(checker).cancel(anyBoolean());
338
339         feature = new DistributedLockManager();
340
341         // shutdown without calling afterStart()
342
343         shutdownFeature();
344     }
345
346     /**
347      * Tests afterStop(), when the data source throws an exception when close() is called.
348      */
349     @Test
350     void testAfterStopEx() {
351         shutdownFeature();
352
353         // use a data source that throws an exception when closed
354         feature = new InvalidDbLockingFeature(TRANSIENT);
355
356         assertThatCode(this::shutdownFeature).doesNotThrowAnyException();
357     }
358
359     @Test
360     void testCreateLock() throws SQLException {
361         verify(exsvc).execute(any());
362
363         lock = getLock(RESOURCE, callback);
364         assertTrue(lock.isWaiting());
365
366         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
367
368         // this lock should fail
369         LockCallback callback2 = mock(LockCallback.class);
370         DistributedLock lock2 = getLock(RESOURCE, callback2);
371         assertTrue(lock2.isUnavailable());
372         verify(callback2, never()).lockAvailable(lock2);
373         verify(callback2).lockUnavailable(lock2);
374
375         // this should fail, too
376         LockCallback callback3 = mock(LockCallback.class);
377         DistributedLock lock3 = getLock(RESOURCE, callback3);
378         assertTrue(lock3.isUnavailable());
379         verify(callback3, never()).lockAvailable(lock3);
380         verify(callback3).lockUnavailable(lock3);
381
382         // no change to first
383         assertTrue(lock.isWaiting());
384
385         // no callbacks to the first lock
386         verify(callback, never()).lockAvailable(lock);
387         verify(callback, never()).lockUnavailable(lock);
388
389         assertTrue(lock.isWaiting());
390         assertEquals(0, getRecordCount());
391
392         runLock(0, 0);
393         assertTrue(lock.isActive());
394         assertEquals(1, getRecordCount());
395
396         verify(callback).lockAvailable(lock);
397         verify(callback, never()).lockUnavailable(lock);
398
399         // this should succeed
400         DistributedLock lock4 = getLock(RESOURCE2, callback);
401         assertTrue(lock4.isWaiting());
402
403         // after running checker, original records should still remain
404         runChecker(0, EXPIRE_SEC);
405         assertEquals(1, getRecordCount());
406         verify(callback, never()).lockUnavailable(lock);
407     }
408
409     /**
410      * Tests createLock() when the feature is not the latest instance.
411      */
412     @Test
413     void testCreateLockNotLatestInstance() {
414         DistributedLockManager.setLatestInstance(null);
415
416         Lock newLock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
417         assertTrue(newLock.isUnavailable());
418         verify(callback, never()).lockAvailable(any());
419         verify(callback).lockUnavailable(newLock);
420     }
421
422     @Test
423     void testCheckExpired() throws SQLException {
424         lock = getLock(RESOURCE, callback);
425         runLock(0, 0);
426
427         LockCallback callback2 = mock(LockCallback.class);
428         final DistributedLock lock2 = getLock(RESOURCE2, callback2);
429         runLock(1, 0);
430
431         LockCallback callback3 = mock(LockCallback.class);
432         final DistributedLock lock3 = getLock(RESOURCE3, callback3);
433         runLock(2, 0);
434
435         LockCallback callback4 = mock(LockCallback.class);
436         final DistributedLock lock4 = getLock(RESOURCE4, callback4);
437         runLock(3, 0);
438
439         LockCallback callback5 = mock(LockCallback.class);
440         final DistributedLock lock5 = getLock(RESOURCE5, callback5);
441         runLock(4, 0);
442
443         assertEquals(5, getRecordCount());
444
445         // expire one record
446         updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
447
448         // change host of another record
449         updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
450
451         // change uuid of another record
452         updateRecord(RESOURCE5, feature.getPdpName(), OTHER_OWNER, HOLD_SEC);
453
454         // run the checker
455         runChecker(0, EXPIRE_SEC);
456
457         // check lock states
458         assertTrue(lock.isUnavailable());
459         assertTrue(lock2.isActive());
460         assertTrue(lock3.isUnavailable());
461         assertTrue(lock4.isActive());
462         assertTrue(lock5.isUnavailable());
463
464         // allow callbacks
465         runLock(2, 2);
466         runLock(3, 1);
467         runLock(4, 0);
468         verify(callback).lockUnavailable(lock);
469         verify(callback3).lockUnavailable(lock3);
470         verify(callback5).lockUnavailable(lock5);
471
472         verify(callback2, never()).lockUnavailable(lock2);
473         verify(callback4, never()).lockUnavailable(lock4);
474
475         // another check should have been scheduled, with the normal interval
476         runChecker(1, EXPIRE_SEC);
477     }
478
479     /**
480      * Tests checkExpired(), when schedule() throws an exception.
481      */
482     @Test
483     void testCheckExpiredExecRejected() {
484         // arrange for execution to be rejected
485         when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
486             .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
487
488         runChecker(0, EXPIRE_SEC);
489     }
490
491     /**
492      * Tests checkExpired(), when getConnection() throws an exception.
493      */
494     @Test
495     void testCheckExpiredSqlEx() {
496         // use a data source that throws an exception when getConnection() is called
497         feature = new InvalidDbLockingFeature(TRANSIENT);
498
499         runChecker(0, EXPIRE_SEC);
500
501         // it should have scheduled another check, sooner
502         runChecker(0, RETRY_SEC);
503     }
504
505     /**
506      * Tests checkExpired(), when getConnection() throws an exception and the feature is
507      * no longer alive.
508      */
509     @Test
510     void testCheckExpiredSqlExFeatureStopped() {
511         // use a data source that throws an exception when getConnection() is called
512         feature = new InvalidDbLockingFeature(TRANSIENT) {
513             @Override
514             protected SQLException makeEx() {
515                 this.stop();
516                 return super.makeEx();
517             }
518         };
519
520         runChecker(0, EXPIRE_SEC);
521
522         // it should NOT have scheduled another check
523         verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
524     }
525
526     @Test
527     void testExpireLocks() throws SQLException {
528         AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
529
530         feature = new MyLockingFeature(true) {
531             @Override
532             protected BasicDataSource makeDataSource() throws SQLException {
533                 // get the real data source
534                 BasicDataSource src2 = super.makeDataSource();
535
536                 when(dataSource.getConnection()).thenAnswer(answer -> {
537                     DistributedLock lck = freeLock.getAndSet(null);
538                     if (lck != null) {
539                         // free it
540                         lck.free();
541
542                         // run its doUnlock
543                         runLock(4, 0);
544                     }
545
546                     return src2.getConnection();
547                 });
548
549                 return dataSource;
550             }
551         };
552
553         lock = getLock(RESOURCE, callback);
554         runLock(0, 0);
555
556         LockCallback callback2 = mock(LockCallback.class);
557         final DistributedLock lock2 = getLock(RESOURCE2, callback2);
558         runLock(1, 0);
559
560         LockCallback callback3 = mock(LockCallback.class);
561         final DistributedLock lock3 = getLock(RESOURCE3, callback3);
562         // don't run doLock for lock3 - leave it in the waiting state
563
564         LockCallback callback4 = mock(LockCallback.class);
565         final DistributedLock lock4 = getLock(RESOURCE4, callback4);
566         runLock(3, 0);
567
568         assertEquals(3, getRecordCount());
569
570         // expire one record
571         updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
572
573         // arrange to free lock4 while the checker is running
574         freeLock.set(lock4);
575
576         // run the checker
577         runChecker(0, EXPIRE_SEC);
578
579         // check lock states
580         assertTrue(lock.isUnavailable());
581         assertTrue(lock2.isActive());
582         assertTrue(lock3.isWaiting());
583         assertTrue(lock4.isUnavailable());
584
585         runLock(4, 0);
586         verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
587
588         verify(callback).lockUnavailable(lock);
589         verify(callback2, never()).lockUnavailable(lock2);
590         verify(callback3, never()).lockUnavailable(lock3);
591         verify(callback4, never()).lockUnavailable(lock4);
592     }
593
594     @Test
595     void testDistributedLockNoArgs() {
596         DistributedLock newLock = new DistributedLock();
597         assertNull(newLock.getResourceId());
598         assertNull(newLock.getOwnerKey());
599         assertNull(newLock.getCallback());
600         assertEquals(0, newLock.getHoldSec());
601     }
602
603     @Test
604     void testDistributedLock() {
605         assertThatIllegalArgumentException()
606             .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
607             .withMessageContaining("holdSec");
608
609         // should generate no exception
610         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
611     }
612
613     @Test
614     void testDistributedLockSerializable() throws Exception {
615         DistributedLock newLock = getLock(RESOURCE, callback);
616         newLock = roundTrip(newLock);
617
618         assertTrue(newLock.isWaiting());
619
620         assertEquals(RESOURCE, newLock.getResourceId());
621         assertEquals(OWNER_KEY, newLock.getOwnerKey());
622         assertNull(newLock.getCallback());
623         assertEquals(HOLD_SEC, newLock.getHoldSec());
624     }
625
626     @Test
627     void testGrant() {
628         lock = getLock(RESOURCE, callback);
629         assertFalse(lock.isActive());
630
631         // execute the doLock() call
632         runLock(0, 0);
633
634         assertTrue(lock.isActive());
635
636         // the callback for the lock should have been run in the foreground thread
637         verify(callback).lockAvailable(lock);
638     }
639
640     @Test
641     void testDistributedLockDeny() {
642         // get a lock
643         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
644
645         // get another lock - should fail
646         lock = getLock(RESOURCE, callback);
647
648         assertTrue(lock.isUnavailable());
649
650         // the callback for the second lock should have been run in the foreground thread
651         verify(callback).lockUnavailable(lock);
652
653         // should only have a request for the first lock
654         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
655     }
656
657     @Test
658     void testDistributedLockFree() {
659         lock = getLock(RESOURCE, callback);
660
661         assertTrue(lock.free());
662         assertTrue(lock.isUnavailable());
663
664         // run both requests associated with the lock
665         runLock(0, 1);
666         runLock(1, 0);
667
668         // should not have changed state
669         assertTrue(lock.isUnavailable());
670
671         // attempt to free it again
672         assertFalse(lock.free());
673
674         // should not have queued anything else
675         verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
676
677         // new lock should succeed
678         DistributedLock lock2 = getLock(RESOURCE, callback);
679         assertNotSame(lock2, lock);
680         assertTrue(lock2.isWaiting());
681     }
682
683     /**
684      * Tests that free() works on a serialized lock with a new feature.
685      *
686      * @throws Exception if an error occurs
687      */
688     @Test
689     void testDistributedLockFreeSerialized() throws Exception {
690         DistributedLock newLock = getLock(RESOURCE, callback);
691
692         feature = new MyLockingFeature(true);
693
694         newLock = roundTrip(newLock);
695         assertTrue(newLock.free());
696         assertTrue(newLock.isUnavailable());
697     }
698
699     /**
700      * Tests free() on a serialized lock without a feature.
701      *
702      * @throws Exception if an error occurs
703      */
704     @Test
705     void testDistributedLockFreeNoFeature() throws Exception {
706         DistributedLock newLock = getLock(RESOURCE, callback);
707
708         DistributedLockManager.setLatestInstance(null);
709
710         newLock = roundTrip(newLock);
711         assertFalse(newLock.free());
712         assertTrue(newLock.isUnavailable());
713     }
714
715     /**
716      * Tests the case where the lock is freed and doUnlock called between the call to
717      * isUnavailable() and the call to compute().
718      */
719     @Test
720     void testDistributedLockFreeUnlocked() {
721         feature = new FreeWithFreeLockingFeature(true);
722
723         lock = getLock(RESOURCE, callback);
724
725         assertFalse(lock.free());
726         assertTrue(lock.isUnavailable());
727     }
728
729     /**
730      * Tests the case where the lock is freed, but doUnlock is not completed, between the
731      * call to isUnavailable() and the call to compute().
732      */
733     @Test
734     void testDistributedLockFreeLockFreed() {
735         feature = new FreeWithFreeLockingFeature(false);
736
737         lock = getLock(RESOURCE, callback);
738
739         assertFalse(lock.free());
740         assertTrue(lock.isUnavailable());
741     }
742
743     @Test
744     void testDistributedLockExtend() {
745         lock = getLock(RESOURCE, callback);
746
747         // lock2 should be denied - called back by this thread
748         DistributedLock lock2 = getLock(RESOURCE, callback);
749         verify(callback, never()).lockAvailable(lock2);
750         verify(callback).lockUnavailable(lock2);
751
752         // lock2 will still be denied - called back by this thread
753         lock2.extend(HOLD_SEC, callback);
754         verify(callback, times(2)).lockUnavailable(lock2);
755
756         // force lock2 to be active - should still be denied
757         ReflectionTestUtils.setField(lock2, "state", LockState.ACTIVE);
758         lock2.extend(HOLD_SEC, callback);
759         verify(callback, times(3)).lockUnavailable(lock2);
760
761         assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
762             .withMessageContaining("holdSec");
763
764         // execute doLock()
765         runLock(0, 0);
766         assertTrue(lock.isActive());
767
768         // now extend the first lock
769         LockCallback callback2 = mock(LockCallback.class);
770         lock.extend(HOLD_SEC2, callback2);
771         assertTrue(lock.isWaiting());
772
773         // execute doExtend()
774         runLock(1, 0);
775         lock.extend(HOLD_SEC2, callback2);
776         assertEquals(HOLD_SEC2, lock.getHoldSec());
777         verify(callback2).lockAvailable(lock);
778         verify(callback2, never()).lockUnavailable(lock);
779     }
780
781     /**
782      * Tests that extend() works on a serialized lock with a new feature.
783      *
784      * @throws Exception if an error occurs
785      */
786     @Test
787     void testDistributedLockExtendSerialized() throws Exception {
788         DistributedLock newLock = getLock(RESOURCE, callback);
789
790         // run doLock
791         runLock(0, 0);
792         assertTrue(newLock.isActive());
793
794         feature = new MyLockingFeature(true);
795
796         newLock = roundTrip(newLock);
797         assertTrue(newLock.isActive());
798
799         LockCallback mockCallback = mock(LockCallback.class);
800
801         newLock.extend(HOLD_SEC, mockCallback);
802         assertTrue(newLock.isWaiting());
803
804         // run doExtend (in new feature)
805         runLock(0, 0);
806         assertTrue(newLock.isActive());
807
808         verify(mockCallback).lockAvailable(newLock);
809         verify(mockCallback, never()).lockUnavailable(newLock);
810     }
811
812     /**
813      * Tests extend() on a serialized lock without a feature.
814      *
815      * @throws Exception if an error occurs
816      */
817     @Test
818     void testDistributedLockExtendNoFeature() throws Exception {
819         DistributedLock newLock = getLock(RESOURCE, callback);
820
821         // run doLock
822         runLock(0, 0);
823         assertTrue(newLock.isActive());
824
825         DistributedLockManager.setLatestInstance(null);
826
827         newLock = roundTrip(newLock);
828         assertTrue(newLock.isActive());
829
830         LockCallback scallback = mock(LockCallback.class);
831
832         newLock.extend(HOLD_SEC, scallback);
833         assertTrue(newLock.isUnavailable());
834
835         verify(scallback, never()).lockAvailable(newLock);
836         verify(scallback).lockUnavailable(newLock);
837     }
838
839     /**
840      * Tests the case where the lock is freed and doUnlock called between the call to
841      * isUnavailable() and the call to compute().
842      */
843     @Test
844     void testDistributedLockExtendUnlocked() {
845         feature = new FreeWithFreeLockingFeature(true);
846
847         lock = getLock(RESOURCE, callback);
848
849         lock.extend(HOLD_SEC2, callback);
850         assertTrue(lock.isUnavailable());
851         verify(callback).lockUnavailable(lock);
852     }
853
854     /**
855      * Tests the case where the lock is freed, but doUnlock is not completed, between the
856      * call to isUnavailable() and the call to compute().
857      */
858     @Test
859     void testDistributedLockExtendLockFreed() {
860         feature = new FreeWithFreeLockingFeature(false);
861
862         lock = getLock(RESOURCE, callback);
863
864         lock.extend(HOLD_SEC2, callback);
865         assertTrue(lock.isUnavailable());
866         verify(callback).lockUnavailable(lock);
867     }
868
869     @Test
870     void testDistributedLockScheduleRequest() {
871         lock = getLock(RESOURCE, callback);
872         runLock(0, 0);
873
874         verify(callback).lockAvailable(lock);
875     }
876
877     @Test
878     void testDistributedLockRescheduleRequest() {
879         // use a data source that throws an exception when getConnection() is called
880         InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
881         feature = invfeat;
882
883         lock = getLock(RESOURCE, callback);
884
885         // invoke doLock - should fail and reschedule
886         runLock(0, 0);
887
888         // should still be waiting
889         assertTrue(lock.isWaiting());
890         verify(callback, never()).lockUnavailable(lock);
891
892         // free the lock while doLock is executing
893         invfeat.freeLock = true;
894
895         // try scheduled request - should just invoke doUnlock
896         runSchedule(0);
897
898         // should still be waiting
899         assertTrue(lock.isUnavailable());
900         verify(callback, never()).lockUnavailable(lock);
901
902         // should have scheduled a retry of doUnlock
903         verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
904     }
905
906     @Test
907     void testDistributedLockGetNextRequest() {
908         lock = getLock(RESOURCE, callback);
909
910         /*
911          * run doLock. This should cause getNextRequest() to be called twice, once with a
912          * request in the queue, and the second time with request=null.
913          */
914         runLock(0, 0);
915     }
916
917     /**
918      * Tests getNextRequest(), where the same request is still in the queue the second
919      * time it's called.
920      */
921     @Test
922     void testDistributedLockGetNextRequestSameRequest() {
923         // force reschedule to be invoked
924         feature = new InvalidDbLockingFeature(TRANSIENT);
925
926         lock = getLock(RESOURCE, callback);
927
928         /*
929          * run doLock. This should cause getNextRequest() to be called twice, once with a
930          * request in the queue, and the second time with the same request again.
931          */
932         runLock(0, 0);
933
934         verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
935     }
936
937     @Test
938     void testDistributedLockDoRequest() {
939         lock = getLock(RESOURCE, callback);
940
941         assertTrue(lock.isWaiting());
942
943         // run doLock via doRequest
944         runLock(0, 0);
945
946         assertTrue(lock.isActive());
947     }
948
949     /**
950      * Tests doRequest(), when doRequest() is already running within another thread.
951      */
952     @Test
953     void testDistributedLockDoRequestBusy() {
954         /*
955          * this feature will invoke a request in a background thread while it's being run
956          * in a foreground thread.
957          */
958         AtomicBoolean running = new AtomicBoolean(false);
959         AtomicBoolean returned = new AtomicBoolean(false);
960
961         feature = new MyLockingFeature(true) {
962             @Override
963             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
964                                                LockCallback callback) {
965                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
966                     private static final long serialVersionUID = 1L;
967
968                     @Override
969                     protected boolean doDbInsert(Connection conn) throws SQLException {
970                         if (running.get()) {
971                             // already inside the thread - don't recurse any further
972                             return super.doDbInsert(conn);
973                         }
974
975                         running.set(true);
976
977                         Thread thread = new Thread(() -> {
978                             // run doLock from within the new thread
979                             runLock(0, 0);
980                         });
981                         thread.setDaemon(true);
982                         thread.start();
983
984                         // wait for the background thread to complete before continuing
985                         try {
986                             thread.join(5000);
987                         } catch (InterruptedException ignore) {
988                             Thread.currentThread().interrupt();
989                         }
990
991                         returned.set(!thread.isAlive());
992
993                         return super.doDbInsert(conn);
994                     }
995                 };
996             }
997         };
998
999         lock = getLock(RESOURCE, callback);
1000
1001         // run doLock
1002         runLock(0, 0);
1003
1004         assertTrue(returned.get());
1005     }
1006
1007     /**
1008      * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1009      *
1010      * @throws SQLException if an error occurs
1011      */
1012     @Test
1013     void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1014         // throw run-time exception
1015         when(dataSource.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1016
1017         // use a data source that throws an exception when getConnection() is called
1018         feature = new MyLockingFeature(true) {
1019             @Override
1020             protected BasicDataSource makeDataSource() {
1021                 return dataSource;
1022             }
1023         };
1024
1025         lock = getLock(RESOURCE, callback);
1026
1027         // invoke doLock - should NOT reschedule
1028         runLock(0, 0);
1029
1030         assertTrue(lock.isUnavailable());
1031         verify(callback).lockUnavailable(lock);
1032
1033         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1034     }
1035
1036     /**
1037      * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1038      * state.
1039      *
1040      * @throws SQLException if an error occurs
1041      */
1042     @Test
1043     void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1044         // throw run-time exception
1045         when(dataSource.getConnection()).thenAnswer(answer -> {
1046             lock.free();
1047             throw new IllegalStateException(EXPECTED_EXCEPTION);
1048         });
1049
1050         // use a data source that throws an exception when getConnection() is called
1051         feature = new MyLockingFeature(true) {
1052             @Override
1053             protected BasicDataSource makeDataSource() {
1054                 return dataSource;
1055             }
1056         };
1057
1058         lock = getLock(RESOURCE, callback);
1059
1060         // invoke doLock - should NOT reschedule
1061         runLock(0, 0);
1062
1063         assertTrue(lock.isUnavailable());
1064         verify(callback, never()).lockUnavailable(lock);
1065
1066         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1067     }
1068
1069     /**
1070      * Tests doRequest() when the retry count gets exhausted.
1071      */
1072     @Test
1073     void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1074         // use a data source that throws an exception when getConnection() is called
1075         feature = new InvalidDbLockingFeature(TRANSIENT);
1076
1077         lock = getLock(RESOURCE, callback);
1078
1079         // invoke doLock - should fail and reschedule
1080         runLock(0, 0);
1081
1082         // should still be waiting
1083         assertTrue(lock.isWaiting());
1084         verify(callback, never()).lockUnavailable(lock);
1085
1086         // try again, via SCHEDULER - first retry fails
1087         runSchedule(0);
1088
1089         // should still be waiting
1090         assertTrue(lock.isWaiting());
1091         verify(callback, never()).lockUnavailable(lock);
1092
1093         // try again, via SCHEDULER - final retry fails
1094         runSchedule(1);
1095         assertTrue(lock.isUnavailable());
1096
1097         // now callback should have been called
1098         verify(callback).lockUnavailable(lock);
1099     }
1100
1101     /**
1102      * Tests doRequest() when a non-transient DB exception is thrown.
1103      */
1104     @Test
1105     void testDistributedLockDoRequestNotTransient() {
1106         /*
1107          * use a data source that throws a PERMANENT exception when getConnection() is
1108          * called
1109          */
1110         feature = new InvalidDbLockingFeature(PERMANENT);
1111
1112         lock = getLock(RESOURCE, callback);
1113
1114         // invoke doLock - should fail
1115         runLock(0, 0);
1116
1117         assertTrue(lock.isUnavailable());
1118         verify(callback).lockUnavailable(lock);
1119
1120         // should not have scheduled anything new
1121         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1122         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1123     }
1124
1125     @Test
1126     void testDistributedLockDoLock() throws SQLException {
1127         lock = getLock(RESOURCE, callback);
1128
1129         // invoke doLock - should simply do an insert
1130         long tbegin = System.currentTimeMillis();
1131         runLock(0, 0);
1132
1133         assertEquals(1, getRecordCount());
1134         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1135         verify(callback).lockAvailable(lock);
1136     }
1137
1138     /**
1139      * Tests doLock() when the lock is freed before doLock runs.
1140      *
1141      * @throws SQLException if an error occurs
1142      */
1143     @Test
1144     void testDistributedLockDoLockFreed() throws SQLException {
1145         lock = getLock(RESOURCE, callback);
1146
1147         lock.setState(LockState.UNAVAILABLE);
1148
1149         // invoke doLock - should do nothing
1150         runLock(0, 0);
1151
1152         assertEquals(0, getRecordCount());
1153
1154         verify(callback, never()).lockAvailable(lock);
1155     }
1156
1157     /**
1158      * Tests doLock() when a DB exception is thrown.
1159      */
1160     @Test
1161     void testDistributedLockDoLockEx() {
1162         // use a data source that throws an exception when getConnection() is called
1163         feature = new InvalidDbLockingFeature(PERMANENT);
1164
1165         lock = getLock(RESOURCE, callback);
1166
1167         // invoke doLock - should simply do an insert
1168         runLock(0, 0);
1169
1170         // lock should have failed due to exception
1171         verify(callback).lockUnavailable(lock);
1172     }
1173
1174     /**
1175      * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1176      * to be called.
1177      */
1178     @Test
1179     void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1180         // insert an expired record
1181         insertRecord(RESOURCE, feature.getUuidString(), 0);
1182
1183         lock = getLock(RESOURCE, callback);
1184
1185         // invoke doLock - should simply do an update
1186         runLock(0, 0);
1187         verify(callback).lockAvailable(lock);
1188     }
1189
1190     /**
1191      * Tests doLock() when a locked record already exists.
1192      */
1193     @Test
1194     void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1195         // insert an expired record
1196         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1197
1198         lock = getLock(RESOURCE, callback);
1199
1200         // invoke doLock
1201         runLock(0, 0);
1202
1203         // lock should have failed because it's already locked
1204         verify(callback).lockUnavailable(lock);
1205     }
1206
1207     @Test
1208     void testDistributedLockDoUnlock() throws SQLException {
1209         lock = getLock(RESOURCE, callback);
1210
1211         // invoke doLock()
1212         runLock(0, 0);
1213
1214         lock.free();
1215
1216         // invoke doUnlock()
1217         long tbegin = System.currentTimeMillis();
1218         runLock(1, 0);
1219
1220         assertEquals(0, getRecordCount());
1221         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1222
1223         assertTrue(lock.isUnavailable());
1224
1225         // no more callbacks should have occurred
1226         verify(callback, times(1)).lockAvailable(lock);
1227         verify(callback, never()).lockUnavailable(lock);
1228     }
1229
1230     /**
1231      * Tests doUnlock() when a DB exception is thrown.
1232      */
1233     @Test
1234     void testDistributedLockDoUnlockEx() {
1235         feature = new InvalidDbLockingFeature(PERMANENT);
1236
1237         lock = getLock(RESOURCE, callback);
1238
1239         // do NOT invoke doLock() - it will fail without a DB connection
1240
1241         lock.free();
1242
1243         // invoke doUnlock()
1244         runLock(1, 0);
1245
1246         assertTrue(lock.isUnavailable());
1247
1248         // no more callbacks should have occurred
1249         verify(callback, never()).lockAvailable(lock);
1250         verify(callback, never()).lockUnavailable(lock);
1251     }
1252
1253     @Test
1254     void testDistributedLockDoExtend() throws SQLException {
1255         lock = getLock(RESOURCE, callback);
1256         runLock(0, 0);
1257
1258         LockCallback callback2 = mock(LockCallback.class);
1259         lock.extend(HOLD_SEC2, callback2);
1260
1261         // call doExtend()
1262         long tbegin = System.currentTimeMillis();
1263         runLock(1, 0);
1264
1265         assertEquals(1, getRecordCount());
1266         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1267
1268         assertTrue(lock.isActive());
1269
1270         // no more callbacks should have occurred
1271         verify(callback).lockAvailable(lock);
1272         verify(callback, never()).lockUnavailable(lock);
1273
1274         // extension should have succeeded
1275         verify(callback2).lockAvailable(lock);
1276         verify(callback2, never()).lockUnavailable(lock);
1277     }
1278
1279     /**
1280      * Tests doExtend() when the lock is freed before doExtend runs.
1281      *
1282      * @throws SQLException if an error occurs
1283      */
1284     @Test
1285     void testDistributedLockDoExtendFreed() throws SQLException {
1286         lock = getLock(RESOURCE, callback);
1287         lock.extend(HOLD_SEC2, callback);
1288
1289         lock.setState(LockState.UNAVAILABLE);
1290
1291         // invoke doExtend - should do nothing
1292         runLock(1, 0);
1293
1294         assertEquals(0, getRecordCount());
1295
1296         verify(callback, never()).lockAvailable(lock);
1297     }
1298
1299     /**
1300      * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1301      * insert.
1302      *
1303      * @throws SQLException if an error occurs
1304      */
1305     @Test
1306     void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1307         lock = getLock(RESOURCE, callback);
1308         runLock(0, 0);
1309
1310         LockCallback callback2 = mock(LockCallback.class);
1311         lock.extend(HOLD_SEC2, callback2);
1312
1313         // delete the record so it's forced to re-insert it
1314         cleanDb();
1315
1316         // call doExtend()
1317         long tbegin = System.currentTimeMillis();
1318         runLock(1, 0);
1319
1320         assertEquals(1, getRecordCount());
1321         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1322
1323         assertTrue(lock.isActive());
1324
1325         // no more callbacks should have occurred
1326         verify(callback).lockAvailable(lock);
1327         verify(callback, never()).lockUnavailable(lock);
1328
1329         // extension should have succeeded
1330         verify(callback2).lockAvailable(lock);
1331         verify(callback2, never()).lockUnavailable(lock);
1332     }
1333
1334     /**
1335      * Tests doExtend() when both update and insert fail.
1336      */
1337     @Test
1338     void testDistributedLockDoExtendNeitherSucceeds() {
1339         /*
1340          * this feature will create a lock that returns false when doDbUpdate() is
1341          * invoked, or when doDbInsert() is invoked a second time
1342          */
1343         feature = new MyLockingFeature(true) {
1344             @Override
1345             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1346                                                LockCallback callback) {
1347                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1348                     private static final long serialVersionUID = 1L;
1349                     private int ntimes = 0;
1350
1351                     @Override
1352                     protected boolean doDbInsert(Connection conn) throws SQLException {
1353                         if (ntimes++ > 0) {
1354                             return false;
1355                         }
1356
1357                         return super.doDbInsert(conn);
1358                     }
1359
1360                     @Override
1361                     protected boolean doDbUpdate(Connection conn) {
1362                         return false;
1363                     }
1364                 };
1365             }
1366         };
1367
1368         lock = getLock(RESOURCE, callback);
1369         runLock(0, 0);
1370
1371         LockCallback callback2 = mock(LockCallback.class);
1372         lock.extend(HOLD_SEC2, callback2);
1373
1374         // call doExtend()
1375         runLock(1, 0);
1376
1377         assertTrue(lock.isUnavailable());
1378
1379         // no more callbacks should have occurred
1380         verify(callback).lockAvailable(lock);
1381         verify(callback, never()).lockUnavailable(lock);
1382
1383         // extension should have failed
1384         verify(callback2, never()).lockAvailable(lock);
1385         verify(callback2).lockUnavailable(lock);
1386     }
1387
1388     /**
1389      * Tests doExtend() when an exception occurs.
1390      *
1391      * @throws SQLException if an error occurs
1392      */
1393     @Test
1394     void testDistributedLockDoExtendEx() throws SQLException {
1395         lock = getLock(RESOURCE, callback);
1396         runLock(0, 0);
1397
1398         /*
1399          * delete the record and insert one with a different owner, which will cause
1400          * doDbInsert() to throw an exception
1401          */
1402         cleanDb();
1403         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1404
1405         LockCallback callback2 = mock(LockCallback.class);
1406         lock.extend(HOLD_SEC2, callback2);
1407
1408         // call doExtend()
1409         runLock(1, 0);
1410
1411         assertTrue(lock.isUnavailable());
1412
1413         // no more callbacks should have occurred
1414         verify(callback).lockAvailable(lock);
1415         verify(callback, never()).lockUnavailable(lock);
1416
1417         // extension should have failed
1418         verify(callback2, never()).lockAvailable(lock);
1419         verify(callback2).lockUnavailable(lock);
1420     }
1421
1422     @Test
1423     void testDistributedLockToString() {
1424         String text = getLock(RESOURCE, callback).toString();
1425         assertNotNull(text);
1426         assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1427     }
1428
1429     @Test
1430     void testMakeThreadPool() {
1431         // use a REAL feature to test this
1432         feature = new DistributedLockManager();
1433
1434         // this should create a thread pool
1435         feature.beforeCreateLockManager();
1436         feature.afterStart(engine);
1437
1438         assertThatCode(this::shutdownFeature).doesNotThrowAnyException();
1439     }
1440
1441     /**
1442      * Performs a multithreaded test of the locking facility.
1443      *
1444      * @throws InterruptedException if the current thread is interrupted while waiting for
1445      *                              the background threads to complete
1446      */
1447     @Test
1448     void testMultiThreaded() throws InterruptedException {
1449         ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1450
1451         feature = new DistributedLockManager();
1452         feature.beforeCreateLockManager();
1453         feature.afterStart(PolicyEngineConstants.getManager());
1454
1455         List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1456         for (int x = 0; x < MAX_THREADS; ++x) {
1457             threads.add(new MyThread());
1458         }
1459
1460         threads.forEach(Thread::start);
1461
1462         for (MyThread thread : threads) {
1463             thread.join(6000);
1464             assertFalse(thread.isAlive());
1465         }
1466
1467         for (MyThread thread : threads) {
1468             if (thread.err != null) {
1469                 throw thread.err;
1470             }
1471         }
1472
1473         assertTrue(numSuccess.get() > 0);
1474     }
1475
1476     private DistributedLock getLock(String resource, LockCallback callback) {
1477         return (DistributedLock) feature.createLock(resource, DistributedLockManagerTest.OWNER_KEY,
1478             DistributedLockManagerTest.HOLD_SEC, callback, false);
1479     }
1480
1481     private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1482         ByteArrayOutputStream baos = new ByteArrayOutputStream();
1483         try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1484             oos.writeObject(lock);
1485         }
1486
1487         ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1488         try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1489             return (DistributedLock) ois.readObject();
1490         }
1491     }
1492
1493     /**
1494      * Runs the checkExpired() action.
1495      *
1496      * @param nskip    number of actions in the work queue to skip
1497      * @param schedSec number of seconds for which the checker should have been scheduled
1498      */
1499     private void runChecker(int nskip, long schedSec) {
1500         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1501         verify(exsvc, times(nskip + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1502         Runnable action = captor.getAllValues().get(nskip);
1503         action.run();
1504     }
1505
1506     /**
1507      * Runs a lock action (e.g., doLock, doUnlock).
1508      *
1509      * @param nskip       number of actions in the work queue to skip
1510      * @param nadditional number of additional actions that appear in the work queue
1511      *                    <i>after</i> the desired action
1512      */
1513     void runLock(int nskip, int nadditional) {
1514         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1515         verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1516
1517         Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1518         action.run();
1519     }
1520
1521     /**
1522      * Runs a scheduled action (e.g., "retry" action).
1523      *
1524      * @param nskip number of actions in the work queue to skip
1525      */
1526     void runSchedule(int nskip) {
1527         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1528         verify(exsvc, times(PRE_SCHED_EXECS + nskip + 1)).schedule(captor.capture(), anyLong(), any());
1529
1530         Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1531         action.run();
1532     }
1533
1534     /**
1535      * Gets a count of the number of lock records in the DB.
1536      *
1537      * @return the number of lock records in the DB
1538      * @throws SQLException if an error occurs accessing the DB
1539      */
1540     private int getRecordCount() throws SQLException {
1541         try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1542              ResultSet result = stmt.executeQuery()) {
1543
1544             if (result.next()) {
1545                 return result.getInt(1);
1546
1547             } else {
1548                 return 0;
1549             }
1550         }
1551     }
1552
1553     /**
1554      * Determines if there is a record for the given resource whose expiration time is in
1555      * the expected range.
1556      *
1557      * @param resourceId ID of the resource of interest
1558      * @param uuidString UUID string of the owner
1559      * @param holdSec    seconds for which the lock was to be held
1560      * @param tbegin     earliest time, in milliseconds, at which the record could have been
1561      *                   inserted into the DB
1562      * @return {@code true} if a record is found, {@code false} otherwise
1563      * @throws SQLException if an error occurs accessing the DB
1564      */
1565     private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1566         try (PreparedStatement stmt =
1567                  conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1568                      + " WHERE resourceId=? AND host=? AND owner=?")) {
1569
1570             stmt.setString(1, resourceId);
1571             stmt.setString(2, feature.getPdpName());
1572             stmt.setString(3, uuidString);
1573
1574             try (ResultSet result = stmt.executeQuery()) {
1575                 if (result.next()) {
1576                     int remaining = result.getInt(1);
1577                     long maxDiff = System.currentTimeMillis() - tbegin;
1578                     return (remaining >= 0 && holdSec - remaining <= maxDiff);
1579
1580                 } else {
1581                     return false;
1582                 }
1583             }
1584         }
1585     }
1586
1587     /**
1588      * Inserts a record into the DB.
1589      *
1590      * @param resourceId   ID of the resource of interest
1591      * @param uuidString   UUID string of the owner
1592      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1593      * @throws SQLException if an error occurs accessing the DB
1594      */
1595     private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1596         this.insertRecord(resourceId, feature.getPdpName(), uuidString, expireOffset);
1597     }
1598
1599     private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1600         throws SQLException {
1601         try (PreparedStatement stmt =
1602                  conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1603                      + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1604
1605             stmt.setString(1, resourceId);
1606             stmt.setString(2, hostName);
1607             stmt.setString(3, uuidString);
1608             stmt.setInt(4, expireOffset);
1609
1610             assertEquals(1, stmt.executeUpdate());
1611         }
1612     }
1613
1614     /**
1615      * Updates a record in the DB.
1616      *
1617      * @param resourceId   ID of the resource of interest
1618      * @param newUuid      UUID string of the <i>new</i> owner
1619      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1620      * @throws SQLException if an error occurs accessing the DB
1621      */
1622     private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1623         try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1624             + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1625
1626             stmt.setString(1, newHost);
1627             stmt.setString(2, newUuid);
1628             stmt.setInt(3, expireOffset);
1629             stmt.setString(4, resourceId);
1630
1631             assertEquals(1, stmt.executeUpdate());
1632         }
1633     }
1634
1635     /**
1636      * Feature that uses <i>exsvc</i> to execute requests.
1637      */
1638     private class MyLockingFeature extends DistributedLockManager {
1639
1640         public MyLockingFeature(boolean init) {
1641             shutdownFeature();
1642
1643             exsvc = mock(ScheduledExecutorService.class);
1644             lenient().when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1645             ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1646
1647             if (init) {
1648                 beforeCreateLockManager();
1649                 start();
1650                 afterStart(engine);
1651             }
1652         }
1653     }
1654
1655     /**
1656      * Feature whose data source all throws exceptions.
1657      */
1658     private class InvalidDbLockingFeature extends MyLockingFeature {
1659         private final boolean isTransient;
1660         private boolean freeLock = false;
1661
1662         InvalidDbLockingFeature(boolean isTransient) {
1663             // pass "false" because we have to set the error code BEFORE calling
1664             // afterStart()
1665             super(false);
1666
1667             this.isTransient = isTransient;
1668
1669             this.beforeCreateLockManager();
1670             this.start();
1671             this.afterStart(engine);
1672         }
1673
1674         @Override
1675         protected BasicDataSource makeDataSource() throws SQLException {
1676             lenient().when(dataSource.getConnection()).thenAnswer(answer -> {
1677                 if (freeLock) {
1678                     freeLock = false;
1679                     lock.free();
1680                 }
1681
1682                 throw makeEx();
1683             });
1684
1685             doThrow(makeEx()).when(dataSource).close();
1686
1687             return dataSource;
1688         }
1689
1690         protected SQLException makeEx() {
1691             if (isTransient) {
1692                 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1693
1694             } else {
1695                 return new SQLException(EXPECTED_EXCEPTION);
1696             }
1697         }
1698     }
1699
1700     /**
1701      * Feature whose locks free themselves while free() is already running.
1702      */
1703     private class FreeWithFreeLockingFeature extends MyLockingFeature {
1704         private final boolean relock;
1705
1706         public FreeWithFreeLockingFeature(boolean relock) {
1707             super(true);
1708             this.relock = relock;
1709         }
1710
1711         @Override
1712         protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1713                                            LockCallback callback) {
1714
1715             return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1716                 private static final long serialVersionUID = 1L;
1717                 private boolean checked = false;
1718
1719                 @Override
1720                 public boolean isUnavailable() {
1721                     if (checked) {
1722                         return super.isUnavailable();
1723                     }
1724
1725                     checked = true;
1726
1727                     // release and relock
1728                     free();
1729
1730                     if (relock) {
1731                         // run doUnlock
1732                         runLock(1, 0);
1733
1734                         // relock it
1735                         createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1736                     }
1737
1738                     return false;
1739                 }
1740             };
1741         }
1742     }
1743
1744     /**
1745      * Thread used with the multithreaded test. It repeatedly attempts to get a lock,
1746      * extend it, and then unlock it.
1747      */
1748     private class MyThread extends Thread {
1749         AssertionError err = null;
1750
1751         public MyThread() {
1752             setDaemon(true);
1753         }
1754
1755         @Override
1756         public void run() {
1757             try {
1758                 for (int x = 0; x < MAX_LOOPS; ++x) {
1759                     makeAttempt();
1760                 }
1761
1762             } catch (AssertionError e) {
1763                 err = e;
1764             }
1765         }
1766
1767         private void makeAttempt() {
1768             try {
1769                 Semaphore sem = new Semaphore(0);
1770
1771                 LockCallback cb = new LockCallback() {
1772                     @Override
1773                     public void lockAvailable(Lock lock) {
1774                         sem.release();
1775                     }
1776
1777                     @Override
1778                     public void lockUnavailable(Lock lock) {
1779                         sem.release();
1780                     }
1781                 };
1782
1783                 Lock newLock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1784
1785                 // wait for callback, whether available or unavailable
1786                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1787                 if (!newLock.isActive()) {
1788                     return;
1789                 }
1790
1791                 numSuccess.incrementAndGet();
1792
1793                 assertEquals(1, numActive.incrementAndGet());
1794
1795                 newLock.extend(HOLD_SEC2, cb);
1796                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1797                 assertTrue(newLock.isActive());
1798
1799                 // decrement BEFORE free()
1800                 numActive.decrementAndGet();
1801
1802                 assertTrue(newLock.free());
1803                 assertTrue(newLock.isUnavailable());
1804
1805             } catch (InterruptedException e) {
1806                 Thread.currentThread().interrupt();
1807                 throw new AssertionError("interrupted", e);
1808             }
1809         }
1810     }
1811 }