Removing deprecated DMAAP library
[policy/drools-pdp.git] / feature-distributed-locking / src / test / java / org / onap / policy / distributed / locking / DistributedLockManagerTest.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019-2022 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023-2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.distributed.locking;
23
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
27 import static org.assertj.core.api.Assertions.assertThatThrownBy;
28 import static org.junit.jupiter.api.Assertions.assertEquals;
29 import static org.junit.jupiter.api.Assertions.assertFalse;
30 import static org.junit.jupiter.api.Assertions.assertNotNull;
31 import static org.junit.jupiter.api.Assertions.assertNotSame;
32 import static org.junit.jupiter.api.Assertions.assertNull;
33 import static org.junit.jupiter.api.Assertions.assertSame;
34 import static org.junit.jupiter.api.Assertions.assertTrue;
35 import static org.mockito.ArgumentMatchers.any;
36 import static org.mockito.ArgumentMatchers.anyBoolean;
37 import static org.mockito.ArgumentMatchers.anyLong;
38 import static org.mockito.ArgumentMatchers.eq;
39 import static org.mockito.Mockito.doThrow;
40 import static org.mockito.Mockito.lenient;
41 import static org.mockito.Mockito.mock;
42 import static org.mockito.Mockito.never;
43 import static org.mockito.Mockito.times;
44 import static org.mockito.Mockito.verify;
45 import static org.mockito.Mockito.when;
46
47 import java.io.ByteArrayInputStream;
48 import java.io.ByteArrayOutputStream;
49 import java.io.ObjectInputStream;
50 import java.io.ObjectOutputStream;
51 import java.io.Serial;
52 import java.sql.Connection;
53 import java.sql.DriverManager;
54 import java.sql.PreparedStatement;
55 import java.sql.ResultSet;
56 import java.sql.SQLException;
57 import java.sql.SQLTransientException;
58 import java.util.ArrayList;
59 import java.util.List;
60 import java.util.Properties;
61 import java.util.concurrent.Executors;
62 import java.util.concurrent.RejectedExecutionException;
63 import java.util.concurrent.ScheduledExecutorService;
64 import java.util.concurrent.ScheduledFuture;
65 import java.util.concurrent.Semaphore;
66 import java.util.concurrent.TimeUnit;
67 import java.util.concurrent.atomic.AtomicBoolean;
68 import java.util.concurrent.atomic.AtomicInteger;
69 import java.util.concurrent.atomic.AtomicReference;
70 import org.apache.commons.dbcp2.BasicDataSource;
71 import org.junit.jupiter.api.AfterAll;
72 import org.junit.jupiter.api.AfterEach;
73 import org.junit.jupiter.api.BeforeAll;
74 import org.junit.jupiter.api.BeforeEach;
75 import org.junit.jupiter.api.Test;
76 import org.junit.jupiter.api.extension.ExtendWith;
77 import org.kie.api.runtime.KieSession;
78 import org.mockito.ArgumentCaptor;
79 import org.mockito.Mock;
80 import org.mockito.MockitoAnnotations;
81 import org.mockito.junit.jupiter.MockitoExtension;
82 import org.onap.policy.common.utils.services.OrderedServiceImpl;
83 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
84 import org.onap.policy.drools.core.PolicySession;
85 import org.onap.policy.drools.core.lock.Lock;
86 import org.onap.policy.drools.core.lock.LockCallback;
87 import org.onap.policy.drools.core.lock.LockState;
88 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
89 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
90 import org.onap.policy.drools.system.PolicyEngine;
91 import org.onap.policy.drools.system.PolicyEngineConstants;
92 import org.springframework.test.util.ReflectionTestUtils;
93
94 @ExtendWith(MockitoExtension.class)
95 class DistributedLockManagerTest {
96     private static final long EXPIRE_SEC = 900L;
97     private static final long RETRY_SEC = 60L;
98     private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
99     private static final String OTHER_HOST = "other-host";
100     private static final String OTHER_OWNER = "other-owner";
101     private static final String EXPECTED_EXCEPTION = "expected exception";
102     private static final String DB_CONNECTION =
103         "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
104     private static final String DB_USER = "user";
105     private static final String DB_PASSWORD = "password";
106     private static final String OWNER_KEY = "my key";
107     private static final String RESOURCE = "my resource";
108     private static final String RESOURCE2 = "my resource #2";
109     private static final String RESOURCE3 = "my resource #3";
110     private static final String RESOURCE4 = "my resource #4";
111     private static final String RESOURCE5 = "my resource #5";
112     private static final int HOLD_SEC = 100;
113     private static final int HOLD_SEC2 = 120;
114     private static final int MAX_THREADS = 5;
115     private static final int MAX_LOOPS = 100;
116     private static final boolean TRANSIENT = true;
117     private static final boolean PERMANENT = false;
118
119     // number of execute() calls before the first lock attempt
120     private static final int PRE_LOCK_EXECS = 1;
121
122     // number of execute() calls before the first schedule attempt
123     private static final int PRE_SCHED_EXECS = 1;
124
125     private static Connection conn = null;
126     private static ScheduledExecutorService saveExec;
127     private static ScheduledExecutorService realExec;
128
129     @Mock
130     private PolicyEngine engine;
131
132     @Mock
133     private KieSession kieSess;
134
135     @Mock
136     private ScheduledExecutorService exsvc;
137
138     @Mock
139     private ScheduledFuture<?> checker;
140
141     @Mock
142     private LockCallback callback;
143
144     @Mock
145     private BasicDataSource datasrc;
146
147     private DistributedLock lock;
148
149     private AtomicInteger nactive;
150     private AtomicInteger nsuccesses;
151     private DistributedLockManager feature;
152
153     AutoCloseable closeable;
154
155     /**
156      * Configures the location of the property files and creates the DB.
157      *
158      * @throws SQLException if the DB cannot be created
159      */
160     @BeforeAll
161     static void setUpBeforeClass() throws SQLException {
162         SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
163         PolicyEngineConstants.getManager().configure(new Properties());
164
165         conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
166
167         try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
168             + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
169             + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
170             createStmt.executeUpdate();
171         }
172
173         saveExec = (ScheduledExecutorService) ReflectionTestUtils.getField(PolicyEngineConstants.getManager(),
174             POLICY_ENGINE_EXECUTOR_FIELD);
175
176         realExec = Executors.newScheduledThreadPool(3);
177     }
178
179     /**
180      * Restores static fields.
181      */
182     @AfterAll
183     static void tearDownAfterClass() throws SQLException {
184         ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
185         realExec.shutdown();
186         conn.close();
187     }
188
189     /**
190      * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
191      * tasks.
192      *
193      * @throws SQLException if the lock records cannot be deleted from the DB
194      */
195     @BeforeEach
196     void setUp() throws SQLException {
197         closeable = MockitoAnnotations.openMocks(this);
198         // grant() and deny() calls will come through here and be immediately executed
199         PolicySession session = new PolicySession(null, null, kieSess) {
200             @Override
201             public void insertDrools(Object object) {
202                 ((Runnable) object).run();
203             }
204         };
205
206         session.setPolicySession();
207
208         nactive = new AtomicInteger(0);
209         nsuccesses = new AtomicInteger(0);
210
211         cleanDb();
212
213         feature = new MyLockingFeature(true);
214     }
215
216     @AfterEach
217     void tearDown() throws Exception {
218         shutdownFeature();
219         cleanDb();
220         closeable.close();
221     }
222
223     private void cleanDb() throws SQLException {
224         try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
225             stmt.executeUpdate();
226         }
227     }
228
229     private void shutdownFeature() {
230         if (feature != null) {
231             feature.afterStop(engine);
232             feature = null;
233         }
234     }
235
236     /**
237      * Tests that the feature is found in the expected service sets.
238      */
239     @Test
240     void testServiceApis() {
241         assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
242             .anyMatch(obj -> obj instanceof DistributedLockManager));
243     }
244
245     @Test
246     void testGetSequenceNumber() {
247         assertEquals(1000, feature.getSequenceNumber());
248     }
249
250     @Test
251     void testBeforeCreateLockManager() {
252         assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
253     }
254
255     /**
256      * Tests beforeCreate(), when getProperties() throws a runtime exception.
257      */
258     @Test
259     void testBeforeCreateLockManagerEx() {
260         shutdownFeature();
261
262         feature = new MyLockingFeature(false) {
263             @Override
264             protected Properties getProperties(String fileName) {
265                 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
266             }
267         };
268
269         Properties props = new Properties();
270         assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, props))
271             .isInstanceOf(DistributedLockManagerException.class);
272     }
273
274     @Test
275     void testAfterStart() {
276         // verify that cleanup & expire check are both added to the queue
277         verify(exsvc).execute(any());
278         verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
279     }
280
281     /**
282      * Tests afterStart(), when thread pool throws a runtime exception.
283      */
284     @Test
285     void testAfterStartExInThreadPool() {
286         shutdownFeature();
287
288         feature = new MyLockingFeature(false);
289
290         doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
291
292         assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
293     }
294
295     @Test
296     void testDeleteExpiredDbLocks() throws SQLException {
297         // add records: two expired, one not
298         insertRecord(RESOURCE, feature.getUuidString(), -1);
299         insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
300         insertRecord(RESOURCE3, OTHER_OWNER, 0);
301         insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
302
303         // get the clean-up function and execute it
304         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
305         verify(exsvc).execute(captor.capture());
306
307         long tbegin = System.currentTimeMillis();
308         Runnable action = captor.getValue();
309         action.run();
310
311         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
312         assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
313         assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
314         assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
315
316         assertEquals(2, getRecordCount());
317     }
318
319     /**
320      * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
321      *
322      */
323     @Test
324     void testDeleteExpiredDbLocksEx() {
325         feature = new InvalidDbLockingFeature(TRANSIENT);
326
327         // get the clean-up function and execute it
328         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
329         verify(exsvc).execute(captor.capture());
330
331         Runnable action = captor.getValue();
332
333         // should not throw an exception
334         action.run();
335     }
336
337     @Test
338     void testAfterStop() {
339         shutdownFeature();
340         verify(checker).cancel(anyBoolean());
341
342         feature = new DistributedLockManager();
343
344         // shutdown without calling afterStart()
345
346         shutdownFeature();
347     }
348
349     /**
350      * Tests afterStop(), when the data source throws an exception when close() is called.
351      *
352      */
353     @Test
354     void testAfterStopEx() {
355         shutdownFeature();
356
357         // use a data source that throws an exception when closed
358         feature = new InvalidDbLockingFeature(TRANSIENT);
359
360         assertThatCode(this::shutdownFeature).doesNotThrowAnyException();
361     }
362
363     @Test
364     void testCreateLock() throws SQLException {
365         verify(exsvc).execute(any());
366
367         lock = getLock(RESOURCE, callback);
368         assertTrue(lock.isWaiting());
369
370         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
371
372         // this lock should fail
373         LockCallback callback2 = mock(LockCallback.class);
374         DistributedLock lock2 = getLock(RESOURCE, callback2);
375         assertTrue(lock2.isUnavailable());
376         verify(callback2, never()).lockAvailable(lock2);
377         verify(callback2).lockUnavailable(lock2);
378
379         // this should fail, too
380         LockCallback callback3 = mock(LockCallback.class);
381         DistributedLock lock3 = getLock(RESOURCE, callback3);
382         assertTrue(lock3.isUnavailable());
383         verify(callback3, never()).lockAvailable(lock3);
384         verify(callback3).lockUnavailable(lock3);
385
386         // no change to first
387         assertTrue(lock.isWaiting());
388
389         // no callbacks to the first lock
390         verify(callback, never()).lockAvailable(lock);
391         verify(callback, never()).lockUnavailable(lock);
392
393         assertTrue(lock.isWaiting());
394         assertEquals(0, getRecordCount());
395
396         runLock(0, 0);
397         assertTrue(lock.isActive());
398         assertEquals(1, getRecordCount());
399
400         verify(callback).lockAvailable(lock);
401         verify(callback, never()).lockUnavailable(lock);
402
403         // this should succeed
404         DistributedLock lock4 = getLock(RESOURCE2, callback);
405         assertTrue(lock4.isWaiting());
406
407         // after running checker, original records should still remain
408         runChecker(0, EXPIRE_SEC);
409         assertEquals(1, getRecordCount());
410         verify(callback, never()).lockUnavailable(lock);
411     }
412
413     /**
414      * Tests createLock() when the feature is not the latest instance.
415      */
416     @Test
417     void testCreateLockNotLatestInstance() {
418         DistributedLockManager.setLatestInstance(null);
419
420         Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
421         assertTrue(lock.isUnavailable());
422         verify(callback, never()).lockAvailable(any());
423         verify(callback).lockUnavailable(lock);
424     }
425
426     @Test
427     void testCheckExpired() throws SQLException {
428         lock = getLock(RESOURCE, callback);
429         runLock(0, 0);
430
431         LockCallback callback2 = mock(LockCallback.class);
432         final DistributedLock lock2 = getLock(RESOURCE2, callback2);
433         runLock(1, 0);
434
435         LockCallback callback3 = mock(LockCallback.class);
436         final DistributedLock lock3 = getLock(RESOURCE3, callback3);
437         runLock(2, 0);
438
439         LockCallback callback4 = mock(LockCallback.class);
440         final DistributedLock lock4 = getLock(RESOURCE4, callback4);
441         runLock(3, 0);
442
443         LockCallback callback5 = mock(LockCallback.class);
444         final DistributedLock lock5 = getLock(RESOURCE5, callback5);
445         runLock(4, 0);
446
447         assertEquals(5, getRecordCount());
448
449         // expire one record
450         updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
451
452         // change host of another record
453         updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
454
455         // change uuid of another record
456         updateRecord(RESOURCE5, feature.getPdpName(), OTHER_OWNER, HOLD_SEC);
457
458         // run the checker
459         runChecker(0, EXPIRE_SEC);
460
461         // check lock states
462         assertTrue(lock.isUnavailable());
463         assertTrue(lock2.isActive());
464         assertTrue(lock3.isUnavailable());
465         assertTrue(lock4.isActive());
466         assertTrue(lock5.isUnavailable());
467
468         // allow callbacks
469         runLock(2, 2);
470         runLock(3, 1);
471         runLock(4, 0);
472         verify(callback).lockUnavailable(lock);
473         verify(callback3).lockUnavailable(lock3);
474         verify(callback5).lockUnavailable(lock5);
475
476         verify(callback2, never()).lockUnavailable(lock2);
477         verify(callback4, never()).lockUnavailable(lock4);
478
479         // another check should have been scheduled, with the normal interval
480         runChecker(1, EXPIRE_SEC);
481     }
482
483     /**
484      * Tests checkExpired(), when schedule() throws an exception.
485      */
486     @Test
487     void testCheckExpiredExecRejected() {
488         // arrange for execution to be rejected
489         when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
490             .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
491
492         runChecker(0, EXPIRE_SEC);
493     }
494
495     /**
496      * Tests checkExpired(), when getConnection() throws an exception.
497      */
498     @Test
499     void testCheckExpiredSqlEx() {
500         // use a data source that throws an exception when getConnection() is called
501         feature = new InvalidDbLockingFeature(TRANSIENT);
502
503         runChecker(0, EXPIRE_SEC);
504
505         // it should have scheduled another check, sooner
506         runChecker(0, RETRY_SEC);
507     }
508
509     /**
510      * Tests checkExpired(), when getConnection() throws an exception and the feature is
511      * no longer alive.
512      */
513     @Test
514     void testCheckExpiredSqlExFeatureStopped() {
515         // use a data source that throws an exception when getConnection() is called
516         feature = new InvalidDbLockingFeature(TRANSIENT) {
517             @Override
518             protected SQLException makeEx() {
519                 this.stop();
520                 return super.makeEx();
521             }
522         };
523
524         runChecker(0, EXPIRE_SEC);
525
526         // it should NOT have scheduled another check
527         verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
528     }
529
530     @Test
531     void testExpireLocks() throws SQLException {
532         AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
533
534         feature = new MyLockingFeature(true) {
535             @Override
536             protected BasicDataSource makeDataSource() throws Exception {
537                 // get the real data source
538                 BasicDataSource src2 = super.makeDataSource();
539
540                 when(datasrc.getConnection()).thenAnswer(answer -> {
541                     DistributedLock lck = freeLock.getAndSet(null);
542                     if (lck != null) {
543                         // free it
544                         lck.free();
545
546                         // run its doUnlock
547                         runLock(4, 0);
548                     }
549
550                     return src2.getConnection();
551                 });
552
553                 return datasrc;
554             }
555         };
556
557         lock = getLock(RESOURCE, callback);
558         runLock(0, 0);
559
560         LockCallback callback2 = mock(LockCallback.class);
561         final DistributedLock lock2 = getLock(RESOURCE2, callback2);
562         runLock(1, 0);
563
564         LockCallback callback3 = mock(LockCallback.class);
565         final DistributedLock lock3 = getLock(RESOURCE3, callback3);
566         // don't run doLock for lock3 - leave it in the waiting state
567
568         LockCallback callback4 = mock(LockCallback.class);
569         final DistributedLock lock4 = getLock(RESOURCE4, callback4);
570         runLock(3, 0);
571
572         assertEquals(3, getRecordCount());
573
574         // expire one record
575         updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
576
577         // arrange to free lock4 while the checker is running
578         freeLock.set(lock4);
579
580         // run the checker
581         runChecker(0, EXPIRE_SEC);
582
583         // check lock states
584         assertTrue(lock.isUnavailable());
585         assertTrue(lock2.isActive());
586         assertTrue(lock3.isWaiting());
587         assertTrue(lock4.isUnavailable());
588
589         runLock(4, 0);
590         verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
591
592         verify(callback).lockUnavailable(lock);
593         verify(callback2, never()).lockUnavailable(lock2);
594         verify(callback3, never()).lockUnavailable(lock3);
595         verify(callback4, never()).lockUnavailable(lock4);
596     }
597
598     @Test
599     void testDistributedLockNoArgs() {
600         DistributedLock lock = new DistributedLock();
601         assertNull(lock.getResourceId());
602         assertNull(lock.getOwnerKey());
603         assertNull(lock.getCallback());
604         assertEquals(0, lock.getHoldSec());
605     }
606
607     @Test
608     void testDistributedLock() {
609         assertThatIllegalArgumentException()
610             .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
611             .withMessageContaining("holdSec");
612
613         // should generate no exception
614         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
615     }
616
617     @Test
618     void testDistributedLockSerializable() throws Exception {
619         DistributedLock lock = getLock(RESOURCE, callback);
620         lock = roundTrip(lock);
621
622         assertTrue(lock.isWaiting());
623
624         assertEquals(RESOURCE, lock.getResourceId());
625         assertEquals(OWNER_KEY, lock.getOwnerKey());
626         assertNull(lock.getCallback());
627         assertEquals(HOLD_SEC, lock.getHoldSec());
628     }
629
630     @Test
631     void testGrant() {
632         lock = getLock(RESOURCE, callback);
633         assertFalse(lock.isActive());
634
635         // execute the doLock() call
636         runLock(0, 0);
637
638         assertTrue(lock.isActive());
639
640         // the callback for the lock should have been run in the foreground thread
641         verify(callback).lockAvailable(lock);
642     }
643
644     @Test
645     void testDistributedLockDeny() {
646         // get a lock
647         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
648
649         // get another lock - should fail
650         lock = getLock(RESOURCE, callback);
651
652         assertTrue(lock.isUnavailable());
653
654         // the callback for the second lock should have been run in the foreground thread
655         verify(callback).lockUnavailable(lock);
656
657         // should only have a request for the first lock
658         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
659     }
660
661     @Test
662     void testDistributedLockFree() {
663         lock = getLock(RESOURCE, callback);
664
665         assertTrue(lock.free());
666         assertTrue(lock.isUnavailable());
667
668         // run both requests associated with the lock
669         runLock(0, 1);
670         runLock(1, 0);
671
672         // should not have changed state
673         assertTrue(lock.isUnavailable());
674
675         // attempt to free it again
676         assertFalse(lock.free());
677
678         // should not have queued anything else
679         verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
680
681         // new lock should succeed
682         DistributedLock lock2 = getLock(RESOURCE, callback);
683         assertNotSame(lock2, lock);
684         assertTrue(lock2.isWaiting());
685     }
686
687     /**
688      * Tests that free() works on a serialized lock with a new feature.
689      *
690      * @throws Exception if an error occurs
691      */
692     @Test
693     void testDistributedLockFreeSerialized() throws Exception {
694         DistributedLock lock = getLock(RESOURCE, callback);
695
696         feature = new MyLockingFeature(true);
697
698         lock = roundTrip(lock);
699         assertTrue(lock.free());
700         assertTrue(lock.isUnavailable());
701     }
702
703     /**
704      * Tests free() on a serialized lock without a feature.
705      *
706      * @throws Exception if an error occurs
707      */
708     @Test
709     void testDistributedLockFreeNoFeature() throws Exception {
710         DistributedLock lock = getLock(RESOURCE, callback);
711
712         DistributedLockManager.setLatestInstance(null);
713
714         lock = roundTrip(lock);
715         assertFalse(lock.free());
716         assertTrue(lock.isUnavailable());
717     }
718
719     /**
720      * Tests the case where the lock is freed and doUnlock called between the call to
721      * isUnavailable() and the call to compute().
722      */
723     @Test
724     void testDistributedLockFreeUnlocked() {
725         feature = new FreeWithFreeLockingFeature(true);
726
727         lock = getLock(RESOURCE, callback);
728
729         assertFalse(lock.free());
730         assertTrue(lock.isUnavailable());
731     }
732
733     /**
734      * Tests the case where the lock is freed, but doUnlock is not completed, between the
735      * call to isUnavailable() and the call to compute().
736      */
737     @Test
738     void testDistributedLockFreeLockFreed() {
739         feature = new FreeWithFreeLockingFeature(false);
740
741         lock = getLock(RESOURCE, callback);
742
743         assertFalse(lock.free());
744         assertTrue(lock.isUnavailable());
745     }
746
747     @Test
748     void testDistributedLockExtend() {
749         lock = getLock(RESOURCE, callback);
750
751         // lock2 should be denied - called back by this thread
752         DistributedLock lock2 = getLock(RESOURCE, callback);
753         verify(callback, never()).lockAvailable(lock2);
754         verify(callback).lockUnavailable(lock2);
755
756         // lock2 will still be denied - called back by this thread
757         lock2.extend(HOLD_SEC, callback);
758         verify(callback, times(2)).lockUnavailable(lock2);
759
760         // force lock2 to be active - should still be denied
761         ReflectionTestUtils.setField(lock2, "state", LockState.ACTIVE);
762         lock2.extend(HOLD_SEC, callback);
763         verify(callback, times(3)).lockUnavailable(lock2);
764
765         assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
766             .withMessageContaining("holdSec");
767
768         // execute doLock()
769         runLock(0, 0);
770         assertTrue(lock.isActive());
771
772         // now extend the first lock
773         LockCallback callback2 = mock(LockCallback.class);
774         lock.extend(HOLD_SEC2, callback2);
775         assertTrue(lock.isWaiting());
776
777         // execute doExtend()
778         runLock(1, 0);
779         lock.extend(HOLD_SEC2, callback2);
780         assertEquals(HOLD_SEC2, lock.getHoldSec());
781         verify(callback2).lockAvailable(lock);
782         verify(callback2, never()).lockUnavailable(lock);
783     }
784
785     /**
786      * Tests that extend() works on a serialized lock with a new feature.
787      *
788      * @throws Exception if an error occurs
789      */
790     @Test
791     void testDistributedLockExtendSerialized() throws Exception {
792         DistributedLock lock = getLock(RESOURCE, callback);
793
794         // run doLock
795         runLock(0, 0);
796         assertTrue(lock.isActive());
797
798         feature = new MyLockingFeature(true);
799
800         lock = roundTrip(lock);
801         assertTrue(lock.isActive());
802
803         LockCallback scallback = mock(LockCallback.class);
804
805         lock.extend(HOLD_SEC, scallback);
806         assertTrue(lock.isWaiting());
807
808         // run doExtend (in new feature)
809         runLock(0, 0);
810         assertTrue(lock.isActive());
811
812         verify(scallback).lockAvailable(lock);
813         verify(scallback, never()).lockUnavailable(lock);
814     }
815
816     /**
817      * Tests extend() on a serialized lock without a feature.
818      *
819      * @throws Exception if an error occurs
820      */
821     @Test
822     void testDistributedLockExtendNoFeature() throws Exception {
823         DistributedLock lock = getLock(RESOURCE, callback);
824
825         // run doLock
826         runLock(0, 0);
827         assertTrue(lock.isActive());
828
829         DistributedLockManager.setLatestInstance(null);
830
831         lock = roundTrip(lock);
832         assertTrue(lock.isActive());
833
834         LockCallback scallback = mock(LockCallback.class);
835
836         lock.extend(HOLD_SEC, scallback);
837         assertTrue(lock.isUnavailable());
838
839         verify(scallback, never()).lockAvailable(lock);
840         verify(scallback).lockUnavailable(lock);
841     }
842
843     /**
844      * Tests the case where the lock is freed and doUnlock called between the call to
845      * isUnavailable() and the call to compute().
846      */
847     @Test
848     void testDistributedLockExtendUnlocked() {
849         feature = new FreeWithFreeLockingFeature(true);
850
851         lock = getLock(RESOURCE, callback);
852
853         lock.extend(HOLD_SEC2, callback);
854         assertTrue(lock.isUnavailable());
855         verify(callback).lockUnavailable(lock);
856     }
857
858     /**
859      * Tests the case where the lock is freed, but doUnlock is not completed, between the
860      * call to isUnavailable() and the call to compute().
861      */
862     @Test
863     void testDistributedLockExtendLockFreed() {
864         feature = new FreeWithFreeLockingFeature(false);
865
866         lock = getLock(RESOURCE, callback);
867
868         lock.extend(HOLD_SEC2, callback);
869         assertTrue(lock.isUnavailable());
870         verify(callback).lockUnavailable(lock);
871     }
872
873     @Test
874     void testDistributedLockScheduleRequest() {
875         lock = getLock(RESOURCE, callback);
876         runLock(0, 0);
877
878         verify(callback).lockAvailable(lock);
879     }
880
881     @Test
882     void testDistributedLockRescheduleRequest() {
883         // use a data source that throws an exception when getConnection() is called
884         InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
885         feature = invfeat;
886
887         lock = getLock(RESOURCE, callback);
888
889         // invoke doLock - should fail and reschedule
890         runLock(0, 0);
891
892         // should still be waiting
893         assertTrue(lock.isWaiting());
894         verify(callback, never()).lockUnavailable(lock);
895
896         // free the lock while doLock is executing
897         invfeat.freeLock = true;
898
899         // try scheduled request - should just invoke doUnlock
900         runSchedule(0);
901
902         // should still be waiting
903         assertTrue(lock.isUnavailable());
904         verify(callback, never()).lockUnavailable(lock);
905
906         // should have scheduled a retry of doUnlock
907         verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
908     }
909
910     @Test
911     void testDistributedLockGetNextRequest() {
912         lock = getLock(RESOURCE, callback);
913
914         /*
915          * run doLock. This should cause getNextRequest() to be called twice, once with a
916          * request in the queue, and the second time with request=null.
917          */
918         runLock(0, 0);
919     }
920
921     /**
922      * Tests getNextRequest(), where the same request is still in the queue the second
923      * time it's called.
924      */
925     @Test
926     void testDistributedLockGetNextRequestSameRequest() {
927         // force reschedule to be invoked
928         feature = new InvalidDbLockingFeature(TRANSIENT);
929
930         lock = getLock(RESOURCE, callback);
931
932         /*
933          * run doLock. This should cause getNextRequest() to be called twice, once with a
934          * request in the queue, and the second time with the same request again.
935          */
936         runLock(0, 0);
937
938         verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
939     }
940
941     @Test
942     void testDistributedLockDoRequest() {
943         lock = getLock(RESOURCE, callback);
944
945         assertTrue(lock.isWaiting());
946
947         // run doLock via doRequest
948         runLock(0, 0);
949
950         assertTrue(lock.isActive());
951     }
952
953     /**
954      * Tests doRequest(), when doRequest() is already running within another thread.
955      */
956     @Test
957     void testDistributedLockDoRequestBusy() {
958         /*
959          * this feature will invoke a request in a background thread while it's being run
960          * in a foreground thread.
961          */
962         AtomicBoolean running = new AtomicBoolean(false);
963         AtomicBoolean returned = new AtomicBoolean(false);
964
965         feature = new MyLockingFeature(true) {
966             @Override
967             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
968                 LockCallback callback) {
969                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
970                     @Serial
971                     private static final long serialVersionUID = 1L;
972
973                     @Override
974                     protected boolean doDbInsert(Connection conn) throws SQLException {
975                         if (running.get()) {
976                             // already inside the thread - don't recurse any further
977                             return super.doDbInsert(conn);
978                         }
979
980                         running.set(true);
981
982                         Thread thread = new Thread(() -> {
983                             // run doLock from within the new thread
984                             runLock(0, 0);
985                         });
986                         thread.setDaemon(true);
987                         thread.start();
988
989                         // wait for the background thread to complete before continuing
990                         try {
991                             thread.join(5000);
992                         } catch (InterruptedException ignore) {
993                             Thread.currentThread().interrupt();
994                         }
995
996                         returned.set(!thread.isAlive());
997
998                         return super.doDbInsert(conn);
999                     }
1000                 };
1001             }
1002         };
1003
1004         lock = getLock(RESOURCE, callback);
1005
1006         // run doLock
1007         runLock(0, 0);
1008
1009         assertTrue(returned.get());
1010     }
1011
1012     /**
1013      * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1014      *
1015      * @throws SQLException if an error occurs
1016      */
1017     @Test
1018     void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1019         // throw run-time exception
1020         when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1021
1022         // use a data source that throws an exception when getConnection() is called
1023         feature = new MyLockingFeature(true) {
1024             @Override
1025             protected BasicDataSource makeDataSource() {
1026                 return datasrc;
1027             }
1028         };
1029
1030         lock = getLock(RESOURCE, callback);
1031
1032         // invoke doLock - should NOT reschedule
1033         runLock(0, 0);
1034
1035         assertTrue(lock.isUnavailable());
1036         verify(callback).lockUnavailable(lock);
1037
1038         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1039     }
1040
1041     /**
1042      * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1043      * state.
1044      *
1045      * @throws SQLException if an error occurs
1046      */
1047     @Test
1048     void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1049         // throw run-time exception
1050         when(datasrc.getConnection()).thenAnswer(answer -> {
1051             lock.free();
1052             throw new IllegalStateException(EXPECTED_EXCEPTION);
1053         });
1054
1055         // use a data source that throws an exception when getConnection() is called
1056         feature = new MyLockingFeature(true) {
1057             @Override
1058             protected BasicDataSource makeDataSource() {
1059                 return datasrc;
1060             }
1061         };
1062
1063         lock = getLock(RESOURCE, callback);
1064
1065         // invoke doLock - should NOT reschedule
1066         runLock(0, 0);
1067
1068         assertTrue(lock.isUnavailable());
1069         verify(callback, never()).lockUnavailable(lock);
1070
1071         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1072     }
1073
1074     /**
1075      * Tests doRequest() when the retry count gets exhausted.
1076      */
1077     @Test
1078     void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1079         // use a data source that throws an exception when getConnection() is called
1080         feature = new InvalidDbLockingFeature(TRANSIENT);
1081
1082         lock = getLock(RESOURCE, callback);
1083
1084         // invoke doLock - should fail and reschedule
1085         runLock(0, 0);
1086
1087         // should still be waiting
1088         assertTrue(lock.isWaiting());
1089         verify(callback, never()).lockUnavailable(lock);
1090
1091         // try again, via SCHEDULER - first retry fails
1092         runSchedule(0);
1093
1094         // should still be waiting
1095         assertTrue(lock.isWaiting());
1096         verify(callback, never()).lockUnavailable(lock);
1097
1098         // try again, via SCHEDULER - final retry fails
1099         runSchedule(1);
1100         assertTrue(lock.isUnavailable());
1101
1102         // now callback should have been called
1103         verify(callback).lockUnavailable(lock);
1104     }
1105
1106     /**
1107      * Tests doRequest() when a non-transient DB exception is thrown.
1108      */
1109     @Test
1110     void testDistributedLockDoRequestNotTransient() {
1111         /*
1112          * use a data source that throws a PERMANENT exception when getConnection() is
1113          * called
1114          */
1115         feature = new InvalidDbLockingFeature(PERMANENT);
1116
1117         lock = getLock(RESOURCE, callback);
1118
1119         // invoke doLock - should fail
1120         runLock(0, 0);
1121
1122         assertTrue(lock.isUnavailable());
1123         verify(callback).lockUnavailable(lock);
1124
1125         // should not have scheduled anything new
1126         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1127         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1128     }
1129
1130     @Test
1131     void testDistributedLockDoLock() throws SQLException {
1132         lock = getLock(RESOURCE, callback);
1133
1134         // invoke doLock - should simply do an insert
1135         long tbegin = System.currentTimeMillis();
1136         runLock(0, 0);
1137
1138         assertEquals(1, getRecordCount());
1139         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1140         verify(callback).lockAvailable(lock);
1141     }
1142
1143     /**
1144      * Tests doLock() when the lock is freed before doLock runs.
1145      *
1146      * @throws SQLException if an error occurs
1147      */
1148     @Test
1149     void testDistributedLockDoLockFreed() throws SQLException {
1150         lock = getLock(RESOURCE, callback);
1151
1152         lock.setState(LockState.UNAVAILABLE);
1153
1154         // invoke doLock - should do nothing
1155         runLock(0, 0);
1156
1157         assertEquals(0, getRecordCount());
1158
1159         verify(callback, never()).lockAvailable(lock);
1160     }
1161
1162     /**
1163      * Tests doLock() when a DB exception is thrown.
1164      */
1165     @Test
1166     void testDistributedLockDoLockEx() {
1167         // use a data source that throws an exception when getConnection() is called
1168         feature = new InvalidDbLockingFeature(PERMANENT);
1169
1170         lock = getLock(RESOURCE, callback);
1171
1172         // invoke doLock - should simply do an insert
1173         runLock(0, 0);
1174
1175         // lock should have failed due to exception
1176         verify(callback).lockUnavailable(lock);
1177     }
1178
1179     /**
1180      * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1181      * to be called.
1182      */
1183     @Test
1184     void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1185         // insert an expired record
1186         insertRecord(RESOURCE, feature.getUuidString(), 0);
1187
1188         lock = getLock(RESOURCE, callback);
1189
1190         // invoke doLock - should simply do an update
1191         runLock(0, 0);
1192         verify(callback).lockAvailable(lock);
1193     }
1194
1195     /**
1196      * Tests doLock() when a locked record already exists.
1197      */
1198     @Test
1199     void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1200         // insert an expired record
1201         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1202
1203         lock = getLock(RESOURCE, callback);
1204
1205         // invoke doLock
1206         runLock(0, 0);
1207
1208         // lock should have failed because it's already locked
1209         verify(callback).lockUnavailable(lock);
1210     }
1211
1212     @Test
1213     void testDistributedLockDoUnlock() throws SQLException {
1214         lock = getLock(RESOURCE, callback);
1215
1216         // invoke doLock()
1217         runLock(0, 0);
1218
1219         lock.free();
1220
1221         // invoke doUnlock()
1222         long tbegin = System.currentTimeMillis();
1223         runLock(1, 0);
1224
1225         assertEquals(0, getRecordCount());
1226         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1227
1228         assertTrue(lock.isUnavailable());
1229
1230         // no more callbacks should have occurred
1231         verify(callback, times(1)).lockAvailable(lock);
1232         verify(callback, never()).lockUnavailable(lock);
1233     }
1234
1235     /**
1236      * Tests doUnlock() when a DB exception is thrown.
1237      *
1238      */
1239     @Test
1240     void testDistributedLockDoUnlockEx() {
1241         feature = new InvalidDbLockingFeature(PERMANENT);
1242
1243         lock = getLock(RESOURCE, callback);
1244
1245         // do NOT invoke doLock() - it will fail without a DB connection
1246
1247         lock.free();
1248
1249         // invoke doUnlock()
1250         runLock(1, 0);
1251
1252         assertTrue(lock.isUnavailable());
1253
1254         // no more callbacks should have occurred
1255         verify(callback, never()).lockAvailable(lock);
1256         verify(callback, never()).lockUnavailable(lock);
1257     }
1258
1259     @Test
1260     void testDistributedLockDoExtend() throws SQLException {
1261         lock = getLock(RESOURCE, callback);
1262         runLock(0, 0);
1263
1264         LockCallback callback2 = mock(LockCallback.class);
1265         lock.extend(HOLD_SEC2, callback2);
1266
1267         // call doExtend()
1268         long tbegin = System.currentTimeMillis();
1269         runLock(1, 0);
1270
1271         assertEquals(1, getRecordCount());
1272         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1273
1274         assertTrue(lock.isActive());
1275
1276         // no more callbacks should have occurred
1277         verify(callback).lockAvailable(lock);
1278         verify(callback, never()).lockUnavailable(lock);
1279
1280         // extension should have succeeded
1281         verify(callback2).lockAvailable(lock);
1282         verify(callback2, never()).lockUnavailable(lock);
1283     }
1284
1285     /**
1286      * Tests doExtend() when the lock is freed before doExtend runs.
1287      *
1288      * @throws SQLException if an error occurs
1289      */
1290     @Test
1291     void testDistributedLockDoExtendFreed() throws SQLException {
1292         lock = getLock(RESOURCE, callback);
1293         lock.extend(HOLD_SEC2, callback);
1294
1295         lock.setState(LockState.UNAVAILABLE);
1296
1297         // invoke doExtend - should do nothing
1298         runLock(1, 0);
1299
1300         assertEquals(0, getRecordCount());
1301
1302         verify(callback, never()).lockAvailable(lock);
1303     }
1304
1305     /**
1306      * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1307      * insert.
1308      *
1309      * @throws SQLException if an error occurs
1310      */
1311     @Test
1312     void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1313         lock = getLock(RESOURCE, callback);
1314         runLock(0, 0);
1315
1316         LockCallback callback2 = mock(LockCallback.class);
1317         lock.extend(HOLD_SEC2, callback2);
1318
1319         // delete the record so it's forced to re-insert it
1320         cleanDb();
1321
1322         // call doExtend()
1323         long tbegin = System.currentTimeMillis();
1324         runLock(1, 0);
1325
1326         assertEquals(1, getRecordCount());
1327         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1328
1329         assertTrue(lock.isActive());
1330
1331         // no more callbacks should have occurred
1332         verify(callback).lockAvailable(lock);
1333         verify(callback, never()).lockUnavailable(lock);
1334
1335         // extension should have succeeded
1336         verify(callback2).lockAvailable(lock);
1337         verify(callback2, never()).lockUnavailable(lock);
1338     }
1339
1340     /**
1341      * Tests doExtend() when both update and insert fail.
1342      *
1343      */
1344     @Test
1345     void testDistributedLockDoExtendNeitherSucceeds() {
1346         /*
1347          * this feature will create a lock that returns false when doDbUpdate() is
1348          * invoked, or when doDbInsert() is invoked a second time
1349          */
1350         feature = new MyLockingFeature(true) {
1351             @Override
1352             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1353                 LockCallback callback) {
1354                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1355                     @Serial
1356                     private static final long serialVersionUID = 1L;
1357                     private int ntimes = 0;
1358
1359                     @Override
1360                     protected boolean doDbInsert(Connection conn) throws SQLException {
1361                         if (ntimes++ > 0) {
1362                             return false;
1363                         }
1364
1365                         return super.doDbInsert(conn);
1366                     }
1367
1368                     @Override
1369                     protected boolean doDbUpdate(Connection conn) {
1370                         return false;
1371                     }
1372                 };
1373             }
1374         };
1375
1376         lock = getLock(RESOURCE, callback);
1377         runLock(0, 0);
1378
1379         LockCallback callback2 = mock(LockCallback.class);
1380         lock.extend(HOLD_SEC2, callback2);
1381
1382         // call doExtend()
1383         runLock(1, 0);
1384
1385         assertTrue(lock.isUnavailable());
1386
1387         // no more callbacks should have occurred
1388         verify(callback).lockAvailable(lock);
1389         verify(callback, never()).lockUnavailable(lock);
1390
1391         // extension should have failed
1392         verify(callback2, never()).lockAvailable(lock);
1393         verify(callback2).lockUnavailable(lock);
1394     }
1395
1396     /**
1397      * Tests doExtend() when an exception occurs.
1398      *
1399      * @throws SQLException if an error occurs
1400      */
1401     @Test
1402     void testDistributedLockDoExtendEx() throws SQLException {
1403         lock = getLock(RESOURCE, callback);
1404         runLock(0, 0);
1405
1406         /*
1407          * delete the record and insert one with a different owner, which will cause
1408          * doDbInsert() to throw an exception
1409          */
1410         cleanDb();
1411         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1412
1413         LockCallback callback2 = mock(LockCallback.class);
1414         lock.extend(HOLD_SEC2, callback2);
1415
1416         // call doExtend()
1417         runLock(1, 0);
1418
1419         assertTrue(lock.isUnavailable());
1420
1421         // no more callbacks should have occurred
1422         verify(callback).lockAvailable(lock);
1423         verify(callback, never()).lockUnavailable(lock);
1424
1425         // extension should have failed
1426         verify(callback2, never()).lockAvailable(lock);
1427         verify(callback2).lockUnavailable(lock);
1428     }
1429
1430     @Test
1431     void testDistributedLockToString() {
1432         String text = getLock(RESOURCE, callback).toString();
1433         assertNotNull(text);
1434         assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1435     }
1436
1437     @Test
1438     void testMakeThreadPool() {
1439         // use a REAL feature to test this
1440         feature = new DistributedLockManager();
1441
1442         // this should create a thread pool
1443         feature.beforeCreateLockManager(engine, new Properties());
1444         feature.afterStart(engine);
1445
1446         assertThatCode(this::shutdownFeature).doesNotThrowAnyException();
1447     }
1448
1449     /**
1450      * Performs a multithreaded test of the locking facility.
1451      *
1452      * @throws InterruptedException if the current thread is interrupted while waiting for
1453      *         the background threads to complete
1454      */
1455     @Test
1456     void testMultiThreaded() throws InterruptedException {
1457         ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1458
1459         feature = new DistributedLockManager();
1460         feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1461         feature.afterStart(PolicyEngineConstants.getManager());
1462
1463         List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1464         for (int x = 0; x < MAX_THREADS; ++x) {
1465             threads.add(new MyThread());
1466         }
1467
1468         threads.forEach(Thread::start);
1469
1470         for (MyThread thread : threads) {
1471             thread.join(6000);
1472             assertFalse(thread.isAlive());
1473         }
1474
1475         for (MyThread thread : threads) {
1476             if (thread.err != null) {
1477                 throw thread.err;
1478             }
1479         }
1480
1481         assertTrue(nsuccesses.get() > 0);
1482     }
1483
1484     private DistributedLock getLock(String resource, LockCallback callback) {
1485         return (DistributedLock) feature.createLock(resource, DistributedLockManagerTest.OWNER_KEY,
1486             DistributedLockManagerTest.HOLD_SEC, callback, false);
1487     }
1488
1489     private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1490         ByteArrayOutputStream baos = new ByteArrayOutputStream();
1491         try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1492             oos.writeObject(lock);
1493         }
1494
1495         ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1496         try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1497             return (DistributedLock) ois.readObject();
1498         }
1499     }
1500
1501     /**
1502      * Runs the checkExpired() action.
1503      *
1504      * @param nskip    number of actions in the work queue to skip
1505      * @param schedSec number of seconds for which the checker should have been scheduled
1506      */
1507     private void runChecker(int nskip, long schedSec) {
1508         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1509         verify(exsvc, times(nskip + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1510         Runnable action = captor.getAllValues().get(nskip);
1511         action.run();
1512     }
1513
1514     /**
1515      * Runs a lock action (e.g., doLock, doUnlock).
1516      *
1517      * @param nskip number of actions in the work queue to skip
1518      * @param nadditional number of additional actions that appear in the work queue
1519      *        <i>after</i> the desired action
1520      */
1521     void runLock(int nskip, int nadditional) {
1522         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1523         verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1524
1525         Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1526         action.run();
1527     }
1528
1529     /**
1530      * Runs a scheduled action (e.g., "retry" action).
1531      *
1532      * @param nskip number of actions in the work queue to skip
1533      */
1534     void runSchedule(int nskip) {
1535         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1536         verify(exsvc, times(PRE_SCHED_EXECS + nskip + 1)).schedule(captor.capture(), anyLong(), any());
1537
1538         Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1539         action.run();
1540     }
1541
1542     /**
1543      * Gets a count of the number of lock records in the DB.
1544      *
1545      * @return the number of lock records in the DB
1546      * @throws SQLException if an error occurs accessing the DB
1547      */
1548     private int getRecordCount() throws SQLException {
1549         try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1550             ResultSet result = stmt.executeQuery()) {
1551
1552             if (result.next()) {
1553                 return result.getInt(1);
1554
1555             } else {
1556                 return 0;
1557             }
1558         }
1559     }
1560
1561     /**
1562      * Determines if there is a record for the given resource whose expiration time is in
1563      * the expected range.
1564      *
1565      * @param resourceId ID of the resource of interest
1566      * @param uuidString UUID string of the owner
1567      * @param holdSec seconds for which the lock was to be held
1568      * @param tbegin earliest time, in milliseconds, at which the record could have been
1569      *        inserted into the DB
1570      * @return {@code true} if a record is found, {@code false} otherwise
1571      * @throws SQLException if an error occurs accessing the DB
1572      */
1573     private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1574         try (PreparedStatement stmt =
1575             conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1576                 + " WHERE resourceId=? AND host=? AND owner=?")) {
1577
1578             stmt.setString(1, resourceId);
1579             stmt.setString(2, feature.getPdpName());
1580             stmt.setString(3, uuidString);
1581
1582             try (ResultSet result = stmt.executeQuery()) {
1583                 if (result.next()) {
1584                     int remaining = result.getInt(1);
1585                     long maxDiff = System.currentTimeMillis() - tbegin;
1586                     return (remaining >= 0 && holdSec - remaining <= maxDiff);
1587
1588                 } else {
1589                     return false;
1590                 }
1591             }
1592         }
1593     }
1594
1595     /**
1596      * Inserts a record into the DB.
1597      *
1598      * @param resourceId ID of the resource of interest
1599      * @param uuidString UUID string of the owner
1600      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1601      * @throws SQLException if an error occurs accessing the DB
1602      */
1603     private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1604         this.insertRecord(resourceId, feature.getPdpName(), uuidString, expireOffset);
1605     }
1606
1607     private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1608         throws SQLException {
1609         try (PreparedStatement stmt =
1610             conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1611                 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1612
1613             stmt.setString(1, resourceId);
1614             stmt.setString(2, hostName);
1615             stmt.setString(3, uuidString);
1616             stmt.setInt(4, expireOffset);
1617
1618             assertEquals(1, stmt.executeUpdate());
1619         }
1620     }
1621
1622     /**
1623      * Updates a record in the DB.
1624      *
1625      * @param resourceId ID of the resource of interest
1626      * @param newUuid UUID string of the <i>new</i> owner
1627      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1628      * @throws SQLException if an error occurs accessing the DB
1629      */
1630     private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1631         try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1632             + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1633
1634             stmt.setString(1, newHost);
1635             stmt.setString(2, newUuid);
1636             stmt.setInt(3, expireOffset);
1637             stmt.setString(4, resourceId);
1638
1639             assertEquals(1, stmt.executeUpdate());
1640         }
1641     }
1642
1643     /**
1644      * Feature that uses <i>exsvc</i> to execute requests.
1645      */
1646     private class MyLockingFeature extends DistributedLockManager {
1647
1648         public MyLockingFeature(boolean init) {
1649             shutdownFeature();
1650
1651             exsvc = mock(ScheduledExecutorService.class);
1652             lenient().when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1653             ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1654
1655             if (init) {
1656                 beforeCreateLockManager(engine, new Properties());
1657                 start();
1658                 afterStart(engine);
1659             }
1660         }
1661     }
1662
1663     /**
1664      * Feature whose data source all throws exceptions.
1665      */
1666     private class InvalidDbLockingFeature extends MyLockingFeature {
1667         private final boolean isTransient;
1668         private boolean freeLock = false;
1669
1670         InvalidDbLockingFeature(boolean isTransient) {
1671             // pass "false" because we have to set the error code BEFORE calling
1672             // afterStart()
1673             super(false);
1674
1675             this.isTransient = isTransient;
1676
1677             this.beforeCreateLockManager(engine, new Properties());
1678             this.start();
1679             this.afterStart(engine);
1680         }
1681
1682         @Override
1683         protected BasicDataSource makeDataSource() throws Exception {
1684             lenient().when(datasrc.getConnection()).thenAnswer(answer -> {
1685                 if (freeLock) {
1686                     freeLock = false;
1687                     lock.free();
1688                 }
1689
1690                 throw makeEx();
1691             });
1692
1693             doThrow(makeEx()).when(datasrc).close();
1694
1695             return datasrc;
1696         }
1697
1698         protected SQLException makeEx() {
1699             if (isTransient) {
1700                 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1701
1702             } else {
1703                 return new SQLException(EXPECTED_EXCEPTION);
1704             }
1705         }
1706     }
1707
1708     /**
1709      * Feature whose locks free themselves while free() is already running.
1710      */
1711     private class FreeWithFreeLockingFeature extends MyLockingFeature {
1712         private final boolean relock;
1713
1714         public FreeWithFreeLockingFeature(boolean relock) {
1715             super(true);
1716             this.relock = relock;
1717         }
1718
1719         @Override
1720         protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1721             LockCallback callback) {
1722
1723             return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1724                 @Serial
1725                 private static final long serialVersionUID = 1L;
1726                 private boolean checked = false;
1727
1728                 @Override
1729                 public boolean isUnavailable() {
1730                     if (checked) {
1731                         return super.isUnavailable();
1732                     }
1733
1734                     checked = true;
1735
1736                     // release and relock
1737                     free();
1738
1739                     if (relock) {
1740                         // run doUnlock
1741                         runLock(1, 0);
1742
1743                         // relock it
1744                         createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1745                     }
1746
1747                     return false;
1748                 }
1749             };
1750         }
1751     }
1752
1753     /**
1754      * Thread used with the multithreaded test. It repeatedly attempts to get a lock,
1755      * extend it, and then unlock it.
1756      */
1757     private class MyThread extends Thread {
1758         AssertionError err = null;
1759
1760         public MyThread() {
1761             setDaemon(true);
1762         }
1763
1764         @Override
1765         public void run() {
1766             try {
1767                 for (int x = 0; x < MAX_LOOPS; ++x) {
1768                     makeAttempt();
1769                 }
1770
1771             } catch (AssertionError e) {
1772                 err = e;
1773             }
1774         }
1775
1776         private void makeAttempt() {
1777             try {
1778                 Semaphore sem = new Semaphore(0);
1779
1780                 LockCallback cb = new LockCallback() {
1781                     @Override
1782                     public void lockAvailable(Lock lock) {
1783                         sem.release();
1784                     }
1785
1786                     @Override
1787                     public void lockUnavailable(Lock lock) {
1788                         sem.release();
1789                     }
1790                 };
1791
1792                 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1793
1794                 // wait for callback, whether available or unavailable
1795                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1796                 if (!lock.isActive()) {
1797                     return;
1798                 }
1799
1800                 nsuccesses.incrementAndGet();
1801
1802                 assertEquals(1, nactive.incrementAndGet());
1803
1804                 lock.extend(HOLD_SEC2, cb);
1805                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1806                 assertTrue(lock.isActive());
1807
1808                 // decrement BEFORE free()
1809                 nactive.decrementAndGet();
1810
1811                 assertTrue(lock.free());
1812                 assertTrue(lock.isUnavailable());
1813
1814             } catch (InterruptedException e) {
1815                 Thread.currentThread().interrupt();
1816                 throw new AssertionError("interrupted", e);
1817             }
1818         }
1819     }
1820 }