2e173cf01789e404a8d1e68df36f5dc0be6a195b
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019-2022 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023-2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.distributed.locking;
23
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
27 import static org.assertj.core.api.Assertions.assertThatThrownBy;
28 import static org.junit.jupiter.api.Assertions.assertEquals;
29 import static org.junit.jupiter.api.Assertions.assertFalse;
30 import static org.junit.jupiter.api.Assertions.assertNotNull;
31 import static org.junit.jupiter.api.Assertions.assertNotSame;
32 import static org.junit.jupiter.api.Assertions.assertNull;
33 import static org.junit.jupiter.api.Assertions.assertSame;
34 import static org.junit.jupiter.api.Assertions.assertTrue;
35 import static org.mockito.ArgumentMatchers.any;
36 import static org.mockito.ArgumentMatchers.anyBoolean;
37 import static org.mockito.ArgumentMatchers.anyLong;
38 import static org.mockito.ArgumentMatchers.eq;
39 import static org.mockito.Mockito.doThrow;
40 import static org.mockito.Mockito.lenient;
41 import static org.mockito.Mockito.mock;
42 import static org.mockito.Mockito.never;
43 import static org.mockito.Mockito.times;
44 import static org.mockito.Mockito.verify;
45 import static org.mockito.Mockito.when;
46
47 import java.io.ByteArrayInputStream;
48 import java.io.ByteArrayOutputStream;
49 import java.io.ObjectInputStream;
50 import java.io.ObjectOutputStream;
51 import java.sql.Connection;
52 import java.sql.DriverManager;
53 import java.sql.PreparedStatement;
54 import java.sql.ResultSet;
55 import java.sql.SQLException;
56 import java.sql.SQLTransientException;
57 import java.util.ArrayList;
58 import java.util.List;
59 import java.util.Properties;
60 import java.util.concurrent.Executors;
61 import java.util.concurrent.RejectedExecutionException;
62 import java.util.concurrent.ScheduledExecutorService;
63 import java.util.concurrent.ScheduledFuture;
64 import java.util.concurrent.Semaphore;
65 import java.util.concurrent.TimeUnit;
66 import java.util.concurrent.atomic.AtomicBoolean;
67 import java.util.concurrent.atomic.AtomicInteger;
68 import java.util.concurrent.atomic.AtomicReference;
69 import org.apache.commons.dbcp2.BasicDataSource;
70 import org.junit.jupiter.api.AfterAll;
71 import org.junit.jupiter.api.AfterEach;
72 import org.junit.jupiter.api.BeforeAll;
73 import org.junit.jupiter.api.BeforeEach;
74 import org.junit.jupiter.api.Test;
75 import org.junit.jupiter.api.extension.ExtendWith;
76 import org.kie.api.runtime.KieSession;
77 import org.mockito.ArgumentCaptor;
78 import org.mockito.Mock;
79 import org.mockito.MockitoAnnotations;
80 import org.mockito.junit.jupiter.MockitoExtension;
81 import org.onap.policy.common.utils.services.OrderedServiceImpl;
82 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
83 import org.onap.policy.drools.core.PolicySession;
84 import org.onap.policy.drools.core.lock.Lock;
85 import org.onap.policy.drools.core.lock.LockCallback;
86 import org.onap.policy.drools.core.lock.LockState;
87 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
88 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
89 import org.onap.policy.drools.system.PolicyEngine;
90 import org.onap.policy.drools.system.PolicyEngineConstants;
91 import org.springframework.test.util.ReflectionTestUtils;
92
93 @ExtendWith(MockitoExtension.class)
94 class DistributedLockManagerTest {
95     private static final long EXPIRE_SEC = 900L;
96     private static final long RETRY_SEC = 60L;
97     private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
98     private static final String OTHER_HOST = "other-host";
99     private static final String OTHER_OWNER = "other-owner";
100     private static final String EXPECTED_EXCEPTION = "expected exception";
101     private static final String DB_CONNECTION =
102         "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
103     private static final String DB_USER = "user";
104     private static final String DB_PASSWORD = "password";
105     private static final String OWNER_KEY = "my key";
106     private static final String RESOURCE = "my resource";
107     private static final String RESOURCE2 = "my resource #2";
108     private static final String RESOURCE3 = "my resource #3";
109     private static final String RESOURCE4 = "my resource #4";
110     private static final String RESOURCE5 = "my resource #5";
111     private static final int HOLD_SEC = 100;
112     private static final int HOLD_SEC2 = 120;
113     private static final int MAX_THREADS = 5;
114     private static final int MAX_LOOPS = 100;
115     private static final boolean TRANSIENT = true;
116     private static final boolean PERMANENT = false;
117
118     // number of execute() calls before the first lock attempt
119     private static final int PRE_LOCK_EXECS = 1;
120
121     // number of execute() calls before the first schedule attempt
122     private static final int PRE_SCHED_EXECS = 1;
123
124     private static Connection conn = null;
125     private static ScheduledExecutorService saveExec;
126     private static ScheduledExecutorService realExec;
127
128     @Mock
129     private PolicyEngine engine;
130
131     @Mock
132     private KieSession kieSess;
133
134     @Mock
135     private ScheduledExecutorService exsvc;
136
137     @Mock
138     private ScheduledFuture<?> checker;
139
140     @Mock
141     private LockCallback callback;
142
143     @Mock
144     private BasicDataSource datasrc;
145
146     private DistributedLock lock;
147
148     private AtomicInteger nactive;
149     private AtomicInteger nsuccesses;
150     private DistributedLockManager feature;
151
152     AutoCloseable closeable;
153
154     /**
155      * Configures the location of the property files and creates the DB.
156      *
157      * @throws SQLException if the DB cannot be created
158      */
159     @BeforeAll
160     static void setUpBeforeClass() throws SQLException {
161         SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
162         PolicyEngineConstants.getManager().configure(new Properties());
163
164         conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
165
166         try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
167             + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
168             + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
169             createStmt.executeUpdate();
170         }
171
172         saveExec = (ScheduledExecutorService) ReflectionTestUtils.getField(PolicyEngineConstants.getManager(),
173             POLICY_ENGINE_EXECUTOR_FIELD);
174
175         realExec = Executors.newScheduledThreadPool(3);
176     }
177
178     /**
179      * Restores static fields.
180      */
181     @AfterAll
182     static void tearDownAfterClass() throws SQLException {
183         ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
184         realExec.shutdown();
185         conn.close();
186     }
187
188     /**
189      * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
190      * tasks.
191      *
192      * @throws SQLException if the lock records cannot be deleted from the DB
193      */
194     @BeforeEach
195     void setUp() throws SQLException {
196         closeable = MockitoAnnotations.openMocks(this);
197         // grant() and deny() calls will come through here and be immediately executed
198         PolicySession session = new PolicySession(null, null, kieSess) {
199             @Override
200             public void insertDrools(Object object) {
201                 ((Runnable) object).run();
202             }
203         };
204
205         session.setPolicySession();
206
207         nactive = new AtomicInteger(0);
208         nsuccesses = new AtomicInteger(0);
209
210         cleanDb();
211
212         feature = new MyLockingFeature(true);
213     }
214
215     @AfterEach
216     void tearDown() throws Exception {
217         shutdownFeature();
218         cleanDb();
219         closeable.close();
220     }
221
222     private void cleanDb() throws SQLException {
223         try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
224             stmt.executeUpdate();
225         }
226     }
227
228     private void shutdownFeature() {
229         if (feature != null) {
230             feature.afterStop(engine);
231             feature = null;
232         }
233     }
234
235     /**
236      * Tests that the feature is found in the expected service sets.
237      */
238     @Test
239     void testServiceApis() {
240         assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
241             .anyMatch(obj -> obj instanceof DistributedLockManager));
242     }
243
244     @Test
245     void testGetSequenceNumber() {
246         assertEquals(1000, feature.getSequenceNumber());
247     }
248
249     @Test
250     void testBeforeCreateLockManager() {
251         assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
252     }
253
254     /**
255      * Tests beforeCreate(), when getProperties() throws a runtime exception.
256      */
257     @Test
258     void testBeforeCreateLockManagerEx() {
259         shutdownFeature();
260
261         feature = new MyLockingFeature(false) {
262             @Override
263             protected Properties getProperties(String fileName) {
264                 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
265             }
266         };
267
268         Properties props = new Properties();
269         assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, props))
270             .isInstanceOf(DistributedLockManagerException.class);
271     }
272
273     @Test
274     void testAfterStart() {
275         // verify that cleanup & expire check are both added to the queue
276         verify(exsvc).execute(any());
277         verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
278     }
279
280     /**
281      * Tests afterStart(), when thread pool throws a runtime exception.
282      */
283     @Test
284     void testAfterStartExInThreadPool() {
285         shutdownFeature();
286
287         feature = new MyLockingFeature(false);
288
289         doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
290
291         assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
292     }
293
294     @Test
295     void testDeleteExpiredDbLocks() throws SQLException {
296         // add records: two expired, one not
297         insertRecord(RESOURCE, feature.getUuidString(), -1);
298         insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
299         insertRecord(RESOURCE3, OTHER_OWNER, 0);
300         insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
301
302         // get the clean-up function and execute it
303         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
304         verify(exsvc).execute(captor.capture());
305
306         long tbegin = System.currentTimeMillis();
307         Runnable action = captor.getValue();
308         action.run();
309
310         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
311         assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
312         assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
313         assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
314
315         assertEquals(2, getRecordCount());
316     }
317
318     /**
319      * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
320      *
321      */
322     @Test
323     void testDeleteExpiredDbLocksEx() {
324         feature = new InvalidDbLockingFeature(TRANSIENT);
325
326         // get the clean-up function and execute it
327         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
328         verify(exsvc).execute(captor.capture());
329
330         Runnable action = captor.getValue();
331
332         // should not throw an exception
333         action.run();
334     }
335
336     @Test
337     void testAfterStop() {
338         shutdownFeature();
339         verify(checker).cancel(anyBoolean());
340
341         feature = new DistributedLockManager();
342
343         // shutdown without calling afterStart()
344
345         shutdownFeature();
346     }
347
348     /**
349      * Tests afterStop(), when the data source throws an exception when close() is called.
350      *
351      */
352     @Test
353     void testAfterStopEx() {
354         shutdownFeature();
355
356         // use a data source that throws an exception when closed
357         feature = new InvalidDbLockingFeature(TRANSIENT);
358
359         assertThatCode(this::shutdownFeature).doesNotThrowAnyException();
360     }
361
362     @Test
363     void testCreateLock() throws SQLException {
364         verify(exsvc).execute(any());
365
366         lock = getLock(RESOURCE, callback);
367         assertTrue(lock.isWaiting());
368
369         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
370
371         // this lock should fail
372         LockCallback callback2 = mock(LockCallback.class);
373         DistributedLock lock2 = getLock(RESOURCE, callback2);
374         assertTrue(lock2.isUnavailable());
375         verify(callback2, never()).lockAvailable(lock2);
376         verify(callback2).lockUnavailable(lock2);
377
378         // this should fail, too
379         LockCallback callback3 = mock(LockCallback.class);
380         DistributedLock lock3 = getLock(RESOURCE, callback3);
381         assertTrue(lock3.isUnavailable());
382         verify(callback3, never()).lockAvailable(lock3);
383         verify(callback3).lockUnavailable(lock3);
384
385         // no change to first
386         assertTrue(lock.isWaiting());
387
388         // no callbacks to the first lock
389         verify(callback, never()).lockAvailable(lock);
390         verify(callback, never()).lockUnavailable(lock);
391
392         assertTrue(lock.isWaiting());
393         assertEquals(0, getRecordCount());
394
395         runLock(0, 0);
396         assertTrue(lock.isActive());
397         assertEquals(1, getRecordCount());
398
399         verify(callback).lockAvailable(lock);
400         verify(callback, never()).lockUnavailable(lock);
401
402         // this should succeed
403         DistributedLock lock4 = getLock(RESOURCE2, callback);
404         assertTrue(lock4.isWaiting());
405
406         // after running checker, original records should still remain
407         runChecker(0, EXPIRE_SEC);
408         assertEquals(1, getRecordCount());
409         verify(callback, never()).lockUnavailable(lock);
410     }
411
412     /**
413      * Tests createLock() when the feature is not the latest instance.
414      */
415     @Test
416     void testCreateLockNotLatestInstance() {
417         DistributedLockManager.setLatestInstance(null);
418
419         Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
420         assertTrue(lock.isUnavailable());
421         verify(callback, never()).lockAvailable(any());
422         verify(callback).lockUnavailable(lock);
423     }
424
425     @Test
426     void testCheckExpired() throws SQLException {
427         lock = getLock(RESOURCE, callback);
428         runLock(0, 0);
429
430         LockCallback callback2 = mock(LockCallback.class);
431         final DistributedLock lock2 = getLock(RESOURCE2, callback2);
432         runLock(1, 0);
433
434         LockCallback callback3 = mock(LockCallback.class);
435         final DistributedLock lock3 = getLock(RESOURCE3, callback3);
436         runLock(2, 0);
437
438         LockCallback callback4 = mock(LockCallback.class);
439         final DistributedLock lock4 = getLock(RESOURCE4, callback4);
440         runLock(3, 0);
441
442         LockCallback callback5 = mock(LockCallback.class);
443         final DistributedLock lock5 = getLock(RESOURCE5, callback5);
444         runLock(4, 0);
445
446         assertEquals(5, getRecordCount());
447
448         // expire one record
449         updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
450
451         // change host of another record
452         updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
453
454         // change uuid of another record
455         updateRecord(RESOURCE5, feature.getPdpName(), OTHER_OWNER, HOLD_SEC);
456
457         // run the checker
458         runChecker(0, EXPIRE_SEC);
459
460         // check lock states
461         assertTrue(lock.isUnavailable());
462         assertTrue(lock2.isActive());
463         assertTrue(lock3.isUnavailable());
464         assertTrue(lock4.isActive());
465         assertTrue(lock5.isUnavailable());
466
467         // allow callbacks
468         runLock(2, 2);
469         runLock(3, 1);
470         runLock(4, 0);
471         verify(callback).lockUnavailable(lock);
472         verify(callback3).lockUnavailable(lock3);
473         verify(callback5).lockUnavailable(lock5);
474
475         verify(callback2, never()).lockUnavailable(lock2);
476         verify(callback4, never()).lockUnavailable(lock4);
477
478         // another check should have been scheduled, with the normal interval
479         runChecker(1, EXPIRE_SEC);
480     }
481
482     /**
483      * Tests checkExpired(), when schedule() throws an exception.
484      */
485     @Test
486     void testCheckExpiredExecRejected() {
487         // arrange for execution to be rejected
488         when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
489             .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
490
491         runChecker(0, EXPIRE_SEC);
492     }
493
494     /**
495      * Tests checkExpired(), when getConnection() throws an exception.
496      */
497     @Test
498     void testCheckExpiredSqlEx() {
499         // use a data source that throws an exception when getConnection() is called
500         feature = new InvalidDbLockingFeature(TRANSIENT);
501
502         runChecker(0, EXPIRE_SEC);
503
504         // it should have scheduled another check, sooner
505         runChecker(0, RETRY_SEC);
506     }
507
508     /**
509      * Tests checkExpired(), when getConnection() throws an exception and the feature is
510      * no longer alive.
511      */
512     @Test
513     void testCheckExpiredSqlExFeatureStopped() {
514         // use a data source that throws an exception when getConnection() is called
515         feature = new InvalidDbLockingFeature(TRANSIENT) {
516             @Override
517             protected SQLException makeEx() {
518                 this.stop();
519                 return super.makeEx();
520             }
521         };
522
523         runChecker(0, EXPIRE_SEC);
524
525         // it should NOT have scheduled another check
526         verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
527     }
528
529     @Test
530     void testExpireLocks() throws SQLException {
531         AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
532
533         feature = new MyLockingFeature(true) {
534             @Override
535             protected BasicDataSource makeDataSource() throws Exception {
536                 // get the real data source
537                 BasicDataSource src2 = super.makeDataSource();
538
539                 when(datasrc.getConnection()).thenAnswer(answer -> {
540                     DistributedLock lck = freeLock.getAndSet(null);
541                     if (lck != null) {
542                         // free it
543                         lck.free();
544
545                         // run its doUnlock
546                         runLock(4, 0);
547                     }
548
549                     return src2.getConnection();
550                 });
551
552                 return datasrc;
553             }
554         };
555
556         lock = getLock(RESOURCE, callback);
557         runLock(0, 0);
558
559         LockCallback callback2 = mock(LockCallback.class);
560         final DistributedLock lock2 = getLock(RESOURCE2, callback2);
561         runLock(1, 0);
562
563         LockCallback callback3 = mock(LockCallback.class);
564         final DistributedLock lock3 = getLock(RESOURCE3, callback3);
565         // don't run doLock for lock3 - leave it in the waiting state
566
567         LockCallback callback4 = mock(LockCallback.class);
568         final DistributedLock lock4 = getLock(RESOURCE4, callback4);
569         runLock(3, 0);
570
571         assertEquals(3, getRecordCount());
572
573         // expire one record
574         updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
575
576         // arrange to free lock4 while the checker is running
577         freeLock.set(lock4);
578
579         // run the checker
580         runChecker(0, EXPIRE_SEC);
581
582         // check lock states
583         assertTrue(lock.isUnavailable());
584         assertTrue(lock2.isActive());
585         assertTrue(lock3.isWaiting());
586         assertTrue(lock4.isUnavailable());
587
588         runLock(4, 0);
589         verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
590
591         verify(callback).lockUnavailable(lock);
592         verify(callback2, never()).lockUnavailable(lock2);
593         verify(callback3, never()).lockUnavailable(lock3);
594         verify(callback4, never()).lockUnavailable(lock4);
595     }
596
597     @Test
598     void testDistributedLockNoArgs() {
599         DistributedLock lock = new DistributedLock();
600         assertNull(lock.getResourceId());
601         assertNull(lock.getOwnerKey());
602         assertNull(lock.getCallback());
603         assertEquals(0, lock.getHoldSec());
604     }
605
606     @Test
607     void testDistributedLock() {
608         assertThatIllegalArgumentException()
609             .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
610             .withMessageContaining("holdSec");
611
612         // should generate no exception
613         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
614     }
615
616     @Test
617     void testDistributedLockSerializable() throws Exception {
618         DistributedLock lock = getLock(RESOURCE, callback);
619         lock = roundTrip(lock);
620
621         assertTrue(lock.isWaiting());
622
623         assertEquals(RESOURCE, lock.getResourceId());
624         assertEquals(OWNER_KEY, lock.getOwnerKey());
625         assertNull(lock.getCallback());
626         assertEquals(HOLD_SEC, lock.getHoldSec());
627     }
628
629     @Test
630     void testGrant() {
631         lock = getLock(RESOURCE, callback);
632         assertFalse(lock.isActive());
633
634         // execute the doLock() call
635         runLock(0, 0);
636
637         assertTrue(lock.isActive());
638
639         // the callback for the lock should have been run in the foreground thread
640         verify(callback).lockAvailable(lock);
641     }
642
643     @Test
644     void testDistributedLockDeny() {
645         // get a lock
646         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
647
648         // get another lock - should fail
649         lock = getLock(RESOURCE, callback);
650
651         assertTrue(lock.isUnavailable());
652
653         // the callback for the second lock should have been run in the foreground thread
654         verify(callback).lockUnavailable(lock);
655
656         // should only have a request for the first lock
657         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
658     }
659
660     @Test
661     void testDistributedLockFree() {
662         lock = getLock(RESOURCE, callback);
663
664         assertTrue(lock.free());
665         assertTrue(lock.isUnavailable());
666
667         // run both requests associated with the lock
668         runLock(0, 1);
669         runLock(1, 0);
670
671         // should not have changed state
672         assertTrue(lock.isUnavailable());
673
674         // attempt to free it again
675         assertFalse(lock.free());
676
677         // should not have queued anything else
678         verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
679
680         // new lock should succeed
681         DistributedLock lock2 = getLock(RESOURCE, callback);
682         assertNotSame(lock2, lock);
683         assertTrue(lock2.isWaiting());
684     }
685
686     /**
687      * Tests that free() works on a serialized lock with a new feature.
688      *
689      * @throws Exception if an error occurs
690      */
691     @Test
692     void testDistributedLockFreeSerialized() throws Exception {
693         DistributedLock lock = getLock(RESOURCE, callback);
694
695         feature = new MyLockingFeature(true);
696
697         lock = roundTrip(lock);
698         assertTrue(lock.free());
699         assertTrue(lock.isUnavailable());
700     }
701
702     /**
703      * Tests free() on a serialized lock without a feature.
704      *
705      * @throws Exception if an error occurs
706      */
707     @Test
708     void testDistributedLockFreeNoFeature() throws Exception {
709         DistributedLock lock = getLock(RESOURCE, callback);
710
711         DistributedLockManager.setLatestInstance(null);
712
713         lock = roundTrip(lock);
714         assertFalse(lock.free());
715         assertTrue(lock.isUnavailable());
716     }
717
718     /**
719      * Tests the case where the lock is freed and doUnlock called between the call to
720      * isUnavailable() and the call to compute().
721      */
722     @Test
723     void testDistributedLockFreeUnlocked() {
724         feature = new FreeWithFreeLockingFeature(true);
725
726         lock = getLock(RESOURCE, callback);
727
728         assertFalse(lock.free());
729         assertTrue(lock.isUnavailable());
730     }
731
732     /**
733      * Tests the case where the lock is freed, but doUnlock is not completed, between the
734      * call to isUnavailable() and the call to compute().
735      */
736     @Test
737     void testDistributedLockFreeLockFreed() {
738         feature = new FreeWithFreeLockingFeature(false);
739
740         lock = getLock(RESOURCE, callback);
741
742         assertFalse(lock.free());
743         assertTrue(lock.isUnavailable());
744     }
745
746     @Test
747     void testDistributedLockExtend() {
748         lock = getLock(RESOURCE, callback);
749
750         // lock2 should be denied - called back by this thread
751         DistributedLock lock2 = getLock(RESOURCE, callback);
752         verify(callback, never()).lockAvailable(lock2);
753         verify(callback).lockUnavailable(lock2);
754
755         // lock2 will still be denied - called back by this thread
756         lock2.extend(HOLD_SEC, callback);
757         verify(callback, times(2)).lockUnavailable(lock2);
758
759         // force lock2 to be active - should still be denied
760         ReflectionTestUtils.setField(lock2, "state", LockState.ACTIVE);
761         lock2.extend(HOLD_SEC, callback);
762         verify(callback, times(3)).lockUnavailable(lock2);
763
764         assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
765             .withMessageContaining("holdSec");
766
767         // execute doLock()
768         runLock(0, 0);
769         assertTrue(lock.isActive());
770
771         // now extend the first lock
772         LockCallback callback2 = mock(LockCallback.class);
773         lock.extend(HOLD_SEC2, callback2);
774         assertTrue(lock.isWaiting());
775
776         // execute doExtend()
777         runLock(1, 0);
778         lock.extend(HOLD_SEC2, callback2);
779         assertEquals(HOLD_SEC2, lock.getHoldSec());
780         verify(callback2).lockAvailable(lock);
781         verify(callback2, never()).lockUnavailable(lock);
782     }
783
784     /**
785      * Tests that extend() works on a serialized lock with a new feature.
786      *
787      * @throws Exception if an error occurs
788      */
789     @Test
790     void testDistributedLockExtendSerialized() throws Exception {
791         DistributedLock lock = getLock(RESOURCE, callback);
792
793         // run doLock
794         runLock(0, 0);
795         assertTrue(lock.isActive());
796
797         feature = new MyLockingFeature(true);
798
799         lock = roundTrip(lock);
800         assertTrue(lock.isActive());
801
802         LockCallback scallback = mock(LockCallback.class);
803
804         lock.extend(HOLD_SEC, scallback);
805         assertTrue(lock.isWaiting());
806
807         // run doExtend (in new feature)
808         runLock(0, 0);
809         assertTrue(lock.isActive());
810
811         verify(scallback).lockAvailable(lock);
812         verify(scallback, never()).lockUnavailable(lock);
813     }
814
815     /**
816      * Tests extend() on a serialized lock without a feature.
817      *
818      * @throws Exception if an error occurs
819      */
820     @Test
821     void testDistributedLockExtendNoFeature() throws Exception {
822         DistributedLock lock = getLock(RESOURCE, callback);
823
824         // run doLock
825         runLock(0, 0);
826         assertTrue(lock.isActive());
827
828         DistributedLockManager.setLatestInstance(null);
829
830         lock = roundTrip(lock);
831         assertTrue(lock.isActive());
832
833         LockCallback scallback = mock(LockCallback.class);
834
835         lock.extend(HOLD_SEC, scallback);
836         assertTrue(lock.isUnavailable());
837
838         verify(scallback, never()).lockAvailable(lock);
839         verify(scallback).lockUnavailable(lock);
840     }
841
842     /**
843      * Tests the case where the lock is freed and doUnlock called between the call to
844      * isUnavailable() and the call to compute().
845      */
846     @Test
847     void testDistributedLockExtendUnlocked() {
848         feature = new FreeWithFreeLockingFeature(true);
849
850         lock = getLock(RESOURCE, callback);
851
852         lock.extend(HOLD_SEC2, callback);
853         assertTrue(lock.isUnavailable());
854         verify(callback).lockUnavailable(lock);
855     }
856
857     /**
858      * Tests the case where the lock is freed, but doUnlock is not completed, between the
859      * call to isUnavailable() and the call to compute().
860      */
861     @Test
862     void testDistributedLockExtendLockFreed() {
863         feature = new FreeWithFreeLockingFeature(false);
864
865         lock = getLock(RESOURCE, callback);
866
867         lock.extend(HOLD_SEC2, callback);
868         assertTrue(lock.isUnavailable());
869         verify(callback).lockUnavailable(lock);
870     }
871
872     @Test
873     void testDistributedLockScheduleRequest() {
874         lock = getLock(RESOURCE, callback);
875         runLock(0, 0);
876
877         verify(callback).lockAvailable(lock);
878     }
879
880     @Test
881     void testDistributedLockRescheduleRequest() {
882         // use a data source that throws an exception when getConnection() is called
883         InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
884         feature = invfeat;
885
886         lock = getLock(RESOURCE, callback);
887
888         // invoke doLock - should fail and reschedule
889         runLock(0, 0);
890
891         // should still be waiting
892         assertTrue(lock.isWaiting());
893         verify(callback, never()).lockUnavailable(lock);
894
895         // free the lock while doLock is executing
896         invfeat.freeLock = true;
897
898         // try scheduled request - should just invoke doUnlock
899         runSchedule(0);
900
901         // should still be waiting
902         assertTrue(lock.isUnavailable());
903         verify(callback, never()).lockUnavailable(lock);
904
905         // should have scheduled a retry of doUnlock
906         verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
907     }
908
909     @Test
910     void testDistributedLockGetNextRequest() {
911         lock = getLock(RESOURCE, callback);
912
913         /*
914          * run doLock. This should cause getNextRequest() to be called twice, once with a
915          * request in the queue, and the second time with request=null.
916          */
917         runLock(0, 0);
918     }
919
920     /**
921      * Tests getNextRequest(), where the same request is still in the queue the second
922      * time it's called.
923      */
924     @Test
925     void testDistributedLockGetNextRequestSameRequest() {
926         // force reschedule to be invoked
927         feature = new InvalidDbLockingFeature(TRANSIENT);
928
929         lock = getLock(RESOURCE, callback);
930
931         /*
932          * run doLock. This should cause getNextRequest() to be called twice, once with a
933          * request in the queue, and the second time with the same request again.
934          */
935         runLock(0, 0);
936
937         verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
938     }
939
940     @Test
941     void testDistributedLockDoRequest() {
942         lock = getLock(RESOURCE, callback);
943
944         assertTrue(lock.isWaiting());
945
946         // run doLock via doRequest
947         runLock(0, 0);
948
949         assertTrue(lock.isActive());
950     }
951
952     /**
953      * Tests doRequest(), when doRequest() is already running within another thread.
954      */
955     @Test
956     void testDistributedLockDoRequestBusy() {
957         /*
958          * this feature will invoke a request in a background thread while it's being run
959          * in a foreground thread.
960          */
961         AtomicBoolean running = new AtomicBoolean(false);
962         AtomicBoolean returned = new AtomicBoolean(false);
963
964         feature = new MyLockingFeature(true) {
965             @Override
966             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
967                 LockCallback callback) {
968                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
969                     private static final long serialVersionUID = 1L;
970
971                     @Override
972                     protected boolean doDbInsert(Connection conn) throws SQLException {
973                         if (running.get()) {
974                             // already inside the thread - don't recurse any further
975                             return super.doDbInsert(conn);
976                         }
977
978                         running.set(true);
979
980                         Thread thread = new Thread(() -> {
981                             // run doLock from within the new thread
982                             runLock(0, 0);
983                         });
984                         thread.setDaemon(true);
985                         thread.start();
986
987                         // wait for the background thread to complete before continuing
988                         try {
989                             thread.join(5000);
990                         } catch (InterruptedException ignore) {
991                             Thread.currentThread().interrupt();
992                         }
993
994                         returned.set(!thread.isAlive());
995
996                         return super.doDbInsert(conn);
997                     }
998                 };
999             }
1000         };
1001
1002         lock = getLock(RESOURCE, callback);
1003
1004         // run doLock
1005         runLock(0, 0);
1006
1007         assertTrue(returned.get());
1008     }
1009
1010     /**
1011      * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1012      *
1013      * @throws SQLException if an error occurs
1014      */
1015     @Test
1016     void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1017         // throw run-time exception
1018         when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1019
1020         // use a data source that throws an exception when getConnection() is called
1021         feature = new MyLockingFeature(true) {
1022             @Override
1023             protected BasicDataSource makeDataSource() {
1024                 return datasrc;
1025             }
1026         };
1027
1028         lock = getLock(RESOURCE, callback);
1029
1030         // invoke doLock - should NOT reschedule
1031         runLock(0, 0);
1032
1033         assertTrue(lock.isUnavailable());
1034         verify(callback).lockUnavailable(lock);
1035
1036         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1037     }
1038
1039     /**
1040      * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1041      * state.
1042      *
1043      * @throws SQLException if an error occurs
1044      */
1045     @Test
1046     void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1047         // throw run-time exception
1048         when(datasrc.getConnection()).thenAnswer(answer -> {
1049             lock.free();
1050             throw new IllegalStateException(EXPECTED_EXCEPTION);
1051         });
1052
1053         // use a data source that throws an exception when getConnection() is called
1054         feature = new MyLockingFeature(true) {
1055             @Override
1056             protected BasicDataSource makeDataSource() {
1057                 return datasrc;
1058             }
1059         };
1060
1061         lock = getLock(RESOURCE, callback);
1062
1063         // invoke doLock - should NOT reschedule
1064         runLock(0, 0);
1065
1066         assertTrue(lock.isUnavailable());
1067         verify(callback, never()).lockUnavailable(lock);
1068
1069         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1070     }
1071
1072     /**
1073      * Tests doRequest() when the retry count gets exhausted.
1074      */
1075     @Test
1076     void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1077         // use a data source that throws an exception when getConnection() is called
1078         feature = new InvalidDbLockingFeature(TRANSIENT);
1079
1080         lock = getLock(RESOURCE, callback);
1081
1082         // invoke doLock - should fail and reschedule
1083         runLock(0, 0);
1084
1085         // should still be waiting
1086         assertTrue(lock.isWaiting());
1087         verify(callback, never()).lockUnavailable(lock);
1088
1089         // try again, via SCHEDULER - first retry fails
1090         runSchedule(0);
1091
1092         // should still be waiting
1093         assertTrue(lock.isWaiting());
1094         verify(callback, never()).lockUnavailable(lock);
1095
1096         // try again, via SCHEDULER - final retry fails
1097         runSchedule(1);
1098         assertTrue(lock.isUnavailable());
1099
1100         // now callback should have been called
1101         verify(callback).lockUnavailable(lock);
1102     }
1103
1104     /**
1105      * Tests doRequest() when a non-transient DB exception is thrown.
1106      */
1107     @Test
1108     void testDistributedLockDoRequestNotTransient() {
1109         /*
1110          * use a data source that throws a PERMANENT exception when getConnection() is
1111          * called
1112          */
1113         feature = new InvalidDbLockingFeature(PERMANENT);
1114
1115         lock = getLock(RESOURCE, callback);
1116
1117         // invoke doLock - should fail
1118         runLock(0, 0);
1119
1120         assertTrue(lock.isUnavailable());
1121         verify(callback).lockUnavailable(lock);
1122
1123         // should not have scheduled anything new
1124         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1125         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1126     }
1127
1128     @Test
1129     void testDistributedLockDoLock() throws SQLException {
1130         lock = getLock(RESOURCE, callback);
1131
1132         // invoke doLock - should simply do an insert
1133         long tbegin = System.currentTimeMillis();
1134         runLock(0, 0);
1135
1136         assertEquals(1, getRecordCount());
1137         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1138         verify(callback).lockAvailable(lock);
1139     }
1140
1141     /**
1142      * Tests doLock() when the lock is freed before doLock runs.
1143      *
1144      * @throws SQLException if an error occurs
1145      */
1146     @Test
1147     void testDistributedLockDoLockFreed() throws SQLException {
1148         lock = getLock(RESOURCE, callback);
1149
1150         lock.setState(LockState.UNAVAILABLE);
1151
1152         // invoke doLock - should do nothing
1153         runLock(0, 0);
1154
1155         assertEquals(0, getRecordCount());
1156
1157         verify(callback, never()).lockAvailable(lock);
1158     }
1159
1160     /**
1161      * Tests doLock() when a DB exception is thrown.
1162      */
1163     @Test
1164     void testDistributedLockDoLockEx() {
1165         // use a data source that throws an exception when getConnection() is called
1166         feature = new InvalidDbLockingFeature(PERMANENT);
1167
1168         lock = getLock(RESOURCE, callback);
1169
1170         // invoke doLock - should simply do an insert
1171         runLock(0, 0);
1172
1173         // lock should have failed due to exception
1174         verify(callback).lockUnavailable(lock);
1175     }
1176
1177     /**
1178      * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1179      * to be called.
1180      */
1181     @Test
1182     void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1183         // insert an expired record
1184         insertRecord(RESOURCE, feature.getUuidString(), 0);
1185
1186         lock = getLock(RESOURCE, callback);
1187
1188         // invoke doLock - should simply do an update
1189         runLock(0, 0);
1190         verify(callback).lockAvailable(lock);
1191     }
1192
1193     /**
1194      * Tests doLock() when a locked record already exists.
1195      */
1196     @Test
1197     void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1198         // insert an expired record
1199         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1200
1201         lock = getLock(RESOURCE, callback);
1202
1203         // invoke doLock
1204         runLock(0, 0);
1205
1206         // lock should have failed because it's already locked
1207         verify(callback).lockUnavailable(lock);
1208     }
1209
1210     @Test
1211     void testDistributedLockDoUnlock() throws SQLException {
1212         lock = getLock(RESOURCE, callback);
1213
1214         // invoke doLock()
1215         runLock(0, 0);
1216
1217         lock.free();
1218
1219         // invoke doUnlock()
1220         long tbegin = System.currentTimeMillis();
1221         runLock(1, 0);
1222
1223         assertEquals(0, getRecordCount());
1224         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1225
1226         assertTrue(lock.isUnavailable());
1227
1228         // no more callbacks should have occurred
1229         verify(callback, times(1)).lockAvailable(lock);
1230         verify(callback, never()).lockUnavailable(lock);
1231     }
1232
1233     /**
1234      * Tests doUnlock() when a DB exception is thrown.
1235      *
1236      */
1237     @Test
1238     void testDistributedLockDoUnlockEx() {
1239         feature = new InvalidDbLockingFeature(PERMANENT);
1240
1241         lock = getLock(RESOURCE, callback);
1242
1243         // do NOT invoke doLock() - it will fail without a DB connection
1244
1245         lock.free();
1246
1247         // invoke doUnlock()
1248         runLock(1, 0);
1249
1250         assertTrue(lock.isUnavailable());
1251
1252         // no more callbacks should have occurred
1253         verify(callback, never()).lockAvailable(lock);
1254         verify(callback, never()).lockUnavailable(lock);
1255     }
1256
1257     @Test
1258     void testDistributedLockDoExtend() throws SQLException {
1259         lock = getLock(RESOURCE, callback);
1260         runLock(0, 0);
1261
1262         LockCallback callback2 = mock(LockCallback.class);
1263         lock.extend(HOLD_SEC2, callback2);
1264
1265         // call doExtend()
1266         long tbegin = System.currentTimeMillis();
1267         runLock(1, 0);
1268
1269         assertEquals(1, getRecordCount());
1270         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1271
1272         assertTrue(lock.isActive());
1273
1274         // no more callbacks should have occurred
1275         verify(callback).lockAvailable(lock);
1276         verify(callback, never()).lockUnavailable(lock);
1277
1278         // extension should have succeeded
1279         verify(callback2).lockAvailable(lock);
1280         verify(callback2, never()).lockUnavailable(lock);
1281     }
1282
1283     /**
1284      * Tests doExtend() when the lock is freed before doExtend runs.
1285      *
1286      * @throws SQLException if an error occurs
1287      */
1288     @Test
1289     void testDistributedLockDoExtendFreed() throws SQLException {
1290         lock = getLock(RESOURCE, callback);
1291         lock.extend(HOLD_SEC2, callback);
1292
1293         lock.setState(LockState.UNAVAILABLE);
1294
1295         // invoke doExtend - should do nothing
1296         runLock(1, 0);
1297
1298         assertEquals(0, getRecordCount());
1299
1300         verify(callback, never()).lockAvailable(lock);
1301     }
1302
1303     /**
1304      * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1305      * insert.
1306      *
1307      * @throws SQLException if an error occurs
1308      */
1309     @Test
1310     void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1311         lock = getLock(RESOURCE, callback);
1312         runLock(0, 0);
1313
1314         LockCallback callback2 = mock(LockCallback.class);
1315         lock.extend(HOLD_SEC2, callback2);
1316
1317         // delete the record so it's forced to re-insert it
1318         cleanDb();
1319
1320         // call doExtend()
1321         long tbegin = System.currentTimeMillis();
1322         runLock(1, 0);
1323
1324         assertEquals(1, getRecordCount());
1325         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1326
1327         assertTrue(lock.isActive());
1328
1329         // no more callbacks should have occurred
1330         verify(callback).lockAvailable(lock);
1331         verify(callback, never()).lockUnavailable(lock);
1332
1333         // extension should have succeeded
1334         verify(callback2).lockAvailable(lock);
1335         verify(callback2, never()).lockUnavailable(lock);
1336     }
1337
1338     /**
1339      * Tests doExtend() when both update and insert fail.
1340      *
1341      */
1342     @Test
1343     void testDistributedLockDoExtendNeitherSucceeds() {
1344         /*
1345          * this feature will create a lock that returns false when doDbUpdate() is
1346          * invoked, or when doDbInsert() is invoked a second time
1347          */
1348         feature = new MyLockingFeature(true) {
1349             @Override
1350             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1351                 LockCallback callback) {
1352                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1353                     private static final long serialVersionUID = 1L;
1354                     private int ntimes = 0;
1355
1356                     @Override
1357                     protected boolean doDbInsert(Connection conn) throws SQLException {
1358                         if (ntimes++ > 0) {
1359                             return false;
1360                         }
1361
1362                         return super.doDbInsert(conn);
1363                     }
1364
1365                     @Override
1366                     protected boolean doDbUpdate(Connection conn) {
1367                         return false;
1368                     }
1369                 };
1370             }
1371         };
1372
1373         lock = getLock(RESOURCE, callback);
1374         runLock(0, 0);
1375
1376         LockCallback callback2 = mock(LockCallback.class);
1377         lock.extend(HOLD_SEC2, callback2);
1378
1379         // call doExtend()
1380         runLock(1, 0);
1381
1382         assertTrue(lock.isUnavailable());
1383
1384         // no more callbacks should have occurred
1385         verify(callback).lockAvailable(lock);
1386         verify(callback, never()).lockUnavailable(lock);
1387
1388         // extension should have failed
1389         verify(callback2, never()).lockAvailable(lock);
1390         verify(callback2).lockUnavailable(lock);
1391     }
1392
1393     /**
1394      * Tests doExtend() when an exception occurs.
1395      *
1396      * @throws SQLException if an error occurs
1397      */
1398     @Test
1399     void testDistributedLockDoExtendEx() throws SQLException {
1400         lock = getLock(RESOURCE, callback);
1401         runLock(0, 0);
1402
1403         /*
1404          * delete the record and insert one with a different owner, which will cause
1405          * doDbInsert() to throw an exception
1406          */
1407         cleanDb();
1408         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1409
1410         LockCallback callback2 = mock(LockCallback.class);
1411         lock.extend(HOLD_SEC2, callback2);
1412
1413         // call doExtend()
1414         runLock(1, 0);
1415
1416         assertTrue(lock.isUnavailable());
1417
1418         // no more callbacks should have occurred
1419         verify(callback).lockAvailable(lock);
1420         verify(callback, never()).lockUnavailable(lock);
1421
1422         // extension should have failed
1423         verify(callback2, never()).lockAvailable(lock);
1424         verify(callback2).lockUnavailable(lock);
1425     }
1426
1427     @Test
1428     void testDistributedLockToString() {
1429         String text = getLock(RESOURCE, callback).toString();
1430         assertNotNull(text);
1431         assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1432     }
1433
1434     @Test
1435     void testMakeThreadPool() {
1436         // use a REAL feature to test this
1437         feature = new DistributedLockManager();
1438
1439         // this should create a thread pool
1440         feature.beforeCreateLockManager(engine, new Properties());
1441         feature.afterStart(engine);
1442
1443         assertThatCode(this::shutdownFeature).doesNotThrowAnyException();
1444     }
1445
1446     /**
1447      * Performs a multithreaded test of the locking facility.
1448      *
1449      * @throws InterruptedException if the current thread is interrupted while waiting for
1450      *         the background threads to complete
1451      */
1452     @Test
1453     void testMultiThreaded() throws InterruptedException {
1454         ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1455
1456         feature = new DistributedLockManager();
1457         feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1458         feature.afterStart(PolicyEngineConstants.getManager());
1459
1460         List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1461         for (int x = 0; x < MAX_THREADS; ++x) {
1462             threads.add(new MyThread());
1463         }
1464
1465         threads.forEach(Thread::start);
1466
1467         for (MyThread thread : threads) {
1468             thread.join(6000);
1469             assertFalse(thread.isAlive());
1470         }
1471
1472         for (MyThread thread : threads) {
1473             if (thread.err != null) {
1474                 throw thread.err;
1475             }
1476         }
1477
1478         assertTrue(nsuccesses.get() > 0);
1479     }
1480
1481     private DistributedLock getLock(String resource, LockCallback callback) {
1482         return (DistributedLock) feature.createLock(resource, DistributedLockManagerTest.OWNER_KEY,
1483             DistributedLockManagerTest.HOLD_SEC, callback, false);
1484     }
1485
1486     private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1487         ByteArrayOutputStream baos = new ByteArrayOutputStream();
1488         try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1489             oos.writeObject(lock);
1490         }
1491
1492         ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1493         try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1494             return (DistributedLock) ois.readObject();
1495         }
1496     }
1497
1498     /**
1499      * Runs the checkExpired() action.
1500      *
1501      * @param nskip    number of actions in the work queue to skip
1502      * @param schedSec number of seconds for which the checker should have been scheduled
1503      */
1504     private void runChecker(int nskip, long schedSec) {
1505         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1506         verify(exsvc, times(nskip + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1507         Runnable action = captor.getAllValues().get(nskip);
1508         action.run();
1509     }
1510
1511     /**
1512      * Runs a lock action (e.g., doLock, doUnlock).
1513      *
1514      * @param nskip number of actions in the work queue to skip
1515      * @param nadditional number of additional actions that appear in the work queue
1516      *        <i>after</i> the desired action
1517      */
1518     void runLock(int nskip, int nadditional) {
1519         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1520         verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1521
1522         Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1523         action.run();
1524     }
1525
1526     /**
1527      * Runs a scheduled action (e.g., "retry" action).
1528      *
1529      * @param nskip number of actions in the work queue to skip
1530      */
1531     void runSchedule(int nskip) {
1532         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1533         verify(exsvc, times(PRE_SCHED_EXECS + nskip + 1)).schedule(captor.capture(), anyLong(), any());
1534
1535         Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1536         action.run();
1537     }
1538
1539     /**
1540      * Gets a count of the number of lock records in the DB.
1541      *
1542      * @return the number of lock records in the DB
1543      * @throws SQLException if an error occurs accessing the DB
1544      */
1545     private int getRecordCount() throws SQLException {
1546         try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1547             ResultSet result = stmt.executeQuery()) {
1548
1549             if (result.next()) {
1550                 return result.getInt(1);
1551
1552             } else {
1553                 return 0;
1554             }
1555         }
1556     }
1557
1558     /**
1559      * Determines if there is a record for the given resource whose expiration time is in
1560      * the expected range.
1561      *
1562      * @param resourceId ID of the resource of interest
1563      * @param uuidString UUID string of the owner
1564      * @param holdSec seconds for which the lock was to be held
1565      * @param tbegin earliest time, in milliseconds, at which the record could have been
1566      *        inserted into the DB
1567      * @return {@code true} if a record is found, {@code false} otherwise
1568      * @throws SQLException if an error occurs accessing the DB
1569      */
1570     private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1571         try (PreparedStatement stmt =
1572             conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1573                 + " WHERE resourceId=? AND host=? AND owner=?")) {
1574
1575             stmt.setString(1, resourceId);
1576             stmt.setString(2, feature.getPdpName());
1577             stmt.setString(3, uuidString);
1578
1579             try (ResultSet result = stmt.executeQuery()) {
1580                 if (result.next()) {
1581                     int remaining = result.getInt(1);
1582                     long maxDiff = System.currentTimeMillis() - tbegin;
1583                     return (remaining >= 0 && holdSec - remaining <= maxDiff);
1584
1585                 } else {
1586                     return false;
1587                 }
1588             }
1589         }
1590     }
1591
1592     /**
1593      * Inserts a record into the DB.
1594      *
1595      * @param resourceId ID of the resource of interest
1596      * @param uuidString UUID string of the owner
1597      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1598      * @throws SQLException if an error occurs accessing the DB
1599      */
1600     private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1601         this.insertRecord(resourceId, feature.getPdpName(), uuidString, expireOffset);
1602     }
1603
1604     private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1605         throws SQLException {
1606         try (PreparedStatement stmt =
1607             conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1608                 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1609
1610             stmt.setString(1, resourceId);
1611             stmt.setString(2, hostName);
1612             stmt.setString(3, uuidString);
1613             stmt.setInt(4, expireOffset);
1614
1615             assertEquals(1, stmt.executeUpdate());
1616         }
1617     }
1618
1619     /**
1620      * Updates a record in the DB.
1621      *
1622      * @param resourceId ID of the resource of interest
1623      * @param newUuid UUID string of the <i>new</i> owner
1624      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1625      * @throws SQLException if an error occurs accessing the DB
1626      */
1627     private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1628         try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1629             + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1630
1631             stmt.setString(1, newHost);
1632             stmt.setString(2, newUuid);
1633             stmt.setInt(3, expireOffset);
1634             stmt.setString(4, resourceId);
1635
1636             assertEquals(1, stmt.executeUpdate());
1637         }
1638     }
1639
1640     /**
1641      * Feature that uses <i>exsvc</i> to execute requests.
1642      */
1643     private class MyLockingFeature extends DistributedLockManager {
1644
1645         public MyLockingFeature(boolean init) {
1646             shutdownFeature();
1647
1648             exsvc = mock(ScheduledExecutorService.class);
1649             lenient().when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1650             ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1651
1652             if (init) {
1653                 beforeCreateLockManager(engine, new Properties());
1654                 start();
1655                 afterStart(engine);
1656             }
1657         }
1658     }
1659
1660     /**
1661      * Feature whose data source all throws exceptions.
1662      */
1663     private class InvalidDbLockingFeature extends MyLockingFeature {
1664         private final boolean isTransient;
1665         private boolean freeLock = false;
1666
1667         InvalidDbLockingFeature(boolean isTransient) {
1668             // pass "false" because we have to set the error code BEFORE calling
1669             // afterStart()
1670             super(false);
1671
1672             this.isTransient = isTransient;
1673
1674             this.beforeCreateLockManager(engine, new Properties());
1675             this.start();
1676             this.afterStart(engine);
1677         }
1678
1679         @Override
1680         protected BasicDataSource makeDataSource() throws Exception {
1681             lenient().when(datasrc.getConnection()).thenAnswer(answer -> {
1682                 if (freeLock) {
1683                     freeLock = false;
1684                     lock.free();
1685                 }
1686
1687                 throw makeEx();
1688             });
1689
1690             doThrow(makeEx()).when(datasrc).close();
1691
1692             return datasrc;
1693         }
1694
1695         protected SQLException makeEx() {
1696             if (isTransient) {
1697                 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1698
1699             } else {
1700                 return new SQLException(EXPECTED_EXCEPTION);
1701             }
1702         }
1703     }
1704
1705     /**
1706      * Feature whose locks free themselves while free() is already running.
1707      */
1708     private class FreeWithFreeLockingFeature extends MyLockingFeature {
1709         private final boolean relock;
1710
1711         public FreeWithFreeLockingFeature(boolean relock) {
1712             super(true);
1713             this.relock = relock;
1714         }
1715
1716         @Override
1717         protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1718             LockCallback callback) {
1719
1720             return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1721                 private static final long serialVersionUID = 1L;
1722                 private boolean checked = false;
1723
1724                 @Override
1725                 public boolean isUnavailable() {
1726                     if (checked) {
1727                         return super.isUnavailable();
1728                     }
1729
1730                     checked = true;
1731
1732                     // release and relock
1733                     free();
1734
1735                     if (relock) {
1736                         // run doUnlock
1737                         runLock(1, 0);
1738
1739                         // relock it
1740                         createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1741                     }
1742
1743                     return false;
1744                 }
1745             };
1746         }
1747     }
1748
1749     /**
1750      * Thread used with the multithreaded test. It repeatedly attempts to get a lock,
1751      * extend it, and then unlock it.
1752      */
1753     private class MyThread extends Thread {
1754         AssertionError err = null;
1755
1756         public MyThread() {
1757             setDaemon(true);
1758         }
1759
1760         @Override
1761         public void run() {
1762             try {
1763                 for (int x = 0; x < MAX_LOOPS; ++x) {
1764                     makeAttempt();
1765                 }
1766
1767             } catch (AssertionError e) {
1768                 err = e;
1769             }
1770         }
1771
1772         private void makeAttempt() {
1773             try {
1774                 Semaphore sem = new Semaphore(0);
1775
1776                 LockCallback cb = new LockCallback() {
1777                     @Override
1778                     public void lockAvailable(Lock lock) {
1779                         sem.release();
1780                     }
1781
1782                     @Override
1783                     public void lockUnavailable(Lock lock) {
1784                         sem.release();
1785                     }
1786                 };
1787
1788                 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1789
1790                 // wait for callback, whether available or unavailable
1791                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1792                 if (!lock.isActive()) {
1793                     return;
1794                 }
1795
1796                 nsuccesses.incrementAndGet();
1797
1798                 assertEquals(1, nactive.incrementAndGet());
1799
1800                 lock.extend(HOLD_SEC2, cb);
1801                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1802                 assertTrue(lock.isActive());
1803
1804                 // decrement BEFORE free()
1805                 nactive.decrementAndGet();
1806
1807                 assertTrue(lock.free());
1808                 assertTrue(lock.isUnavailable());
1809
1810             } catch (InterruptedException e) {
1811                 Thread.currentThread().interrupt();
1812                 throw new AssertionError("interrupted", e);
1813             }
1814         }
1815     }
1816 }