5351f004353ba1655346f5f0642913d13241d180
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.distributed.locking;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
32 import static org.mockito.Matchers.any;
33 import static org.mockito.Matchers.anyBoolean;
34 import static org.mockito.Matchers.anyLong;
35 import static org.mockito.Matchers.eq;
36 import static org.mockito.Mockito.doThrow;
37 import static org.mockito.Mockito.mock;
38 import static org.mockito.Mockito.never;
39 import static org.mockito.Mockito.times;
40 import static org.mockito.Mockito.verify;
41 import static org.mockito.Mockito.when;
42
43 import java.io.ByteArrayInputStream;
44 import java.io.ByteArrayOutputStream;
45 import java.io.ObjectInputStream;
46 import java.io.ObjectOutputStream;
47 import java.sql.Connection;
48 import java.sql.DriverManager;
49 import java.sql.PreparedStatement;
50 import java.sql.ResultSet;
51 import java.sql.SQLException;
52 import java.sql.SQLTransientException;
53 import java.util.ArrayList;
54 import java.util.List;
55 import java.util.Properties;
56 import java.util.concurrent.Executors;
57 import java.util.concurrent.RejectedExecutionException;
58 import java.util.concurrent.ScheduledExecutorService;
59 import java.util.concurrent.ScheduledFuture;
60 import java.util.concurrent.Semaphore;
61 import java.util.concurrent.TimeUnit;
62 import java.util.concurrent.atomic.AtomicBoolean;
63 import java.util.concurrent.atomic.AtomicInteger;
64 import java.util.concurrent.atomic.AtomicReference;
65 import org.apache.commons.dbcp2.BasicDataSource;
66 import org.junit.After;
67 import org.junit.AfterClass;
68 import org.junit.Before;
69 import org.junit.BeforeClass;
70 import org.junit.Test;
71 import org.kie.api.runtime.KieSession;
72 import org.mockito.ArgumentCaptor;
73 import org.mockito.Mock;
74 import org.mockito.MockitoAnnotations;
75 import org.onap.policy.common.utils.services.OrderedServiceImpl;
76 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
77 import org.onap.policy.drools.core.PolicySession;
78 import org.onap.policy.drools.core.lock.Lock;
79 import org.onap.policy.drools.core.lock.LockCallback;
80 import org.onap.policy.drools.core.lock.LockState;
81 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
82 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
83 import org.onap.policy.drools.system.PolicyEngine;
84 import org.onap.policy.drools.system.PolicyEngineConstants;
85 import org.powermock.reflect.Whitebox;
86
87 public class DistributedLockManagerTest {
88     private static final long EXPIRE_SEC = 900L;
89     private static final long RETRY_SEC = 60L;
90     private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
91     private static final String OTHER_HOST = "other-host";
92     private static final String OTHER_OWNER = "other-owner";
93     private static final String EXPECTED_EXCEPTION = "expected exception";
94     private static final String DB_CONNECTION =
95                     "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
96     private static final String DB_USER = "user";
97     private static final String DB_PASSWORD = "password";
98     private static final String OWNER_KEY = "my key";
99     private static final String RESOURCE = "my resource";
100     private static final String RESOURCE2 = "my resource #2";
101     private static final String RESOURCE3 = "my resource #3";
102     private static final String RESOURCE4 = "my resource #4";
103     private static final String RESOURCE5 = "my resource #5";
104     private static final int HOLD_SEC = 100;
105     private static final int HOLD_SEC2 = 120;
106     private static final int MAX_THREADS = 5;
107     private static final int MAX_LOOPS = 100;
108     private static final boolean TRANSIENT = true;
109     private static final boolean PERMANENT = false;
110
111     // number of execute() calls before the first lock attempt
112     private static final int PRE_LOCK_EXECS = 1;
113
114     // number of execute() calls before the first schedule attempt
115     private static final int PRE_SCHED_EXECS = 1;
116
117     private static Connection conn = null;
118     private static ScheduledExecutorService saveExec;
119     private static ScheduledExecutorService realExec;
120
121     @Mock
122     private PolicyEngine engine;
123
124     @Mock
125     private KieSession kieSess;
126
127     @Mock
128     private ScheduledExecutorService exsvc;
129
130     @Mock
131     private ScheduledFuture<?> checker;
132
133     @Mock
134     private LockCallback callback;
135
136     @Mock
137     private BasicDataSource datasrc;
138
139     private DistributedLock lock;
140     private PolicySession session;
141
142     private AtomicInteger nactive;
143     private AtomicInteger nsuccesses;
144     private DistributedLockManager feature;
145
146
147     /**
148      * Configures the location of the property files and creates the DB.
149      *
150      * @throws SQLException if the DB cannot be created
151      */
152     @BeforeClass
153     public static void setUpBeforeClass() throws SQLException {
154         SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
155
156         conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
157
158         try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
159                         + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
160                         + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
161             createStmt.executeUpdate();
162         }
163
164         saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
165
166         realExec = Executors.newScheduledThreadPool(3);
167     }
168
169     /**
170      * Restores static fields.
171      */
172     @AfterClass
173     public static void tearDownAfterClass() throws SQLException {
174         Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
175         realExec.shutdown();
176         conn.close();
177     }
178
179     /**
180      * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
181      * tasks.
182      *
183      * @throws SQLException if the lock records cannot be deleted from the DB
184      */
185     @Before
186     public void setUp() throws SQLException {
187         MockitoAnnotations.initMocks(this);
188
189         // grant() and deny() calls will come through here and be immediately executed
190         session = new PolicySession(null, null, kieSess) {
191             @Override
192             public void insertDrools(Object object) {
193                 ((Runnable) object).run();
194             }
195         };
196
197         session.setPolicySession();
198
199         nactive = new AtomicInteger(0);
200         nsuccesses = new AtomicInteger(0);
201
202         cleanDb();
203
204         feature = new MyLockingFeature(true);
205     }
206
207     @After
208     public void tearDown() throws SQLException {
209         shutdownFeature();
210         cleanDb();
211     }
212
213     private void cleanDb() throws SQLException {
214         try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
215             stmt.executeUpdate();
216         }
217     }
218
219     private void shutdownFeature() {
220         if (feature != null) {
221             feature.afterStop(engine);
222             feature = null;
223         }
224     }
225
226     /**
227      * Tests that the feature is found in the expected service sets.
228      */
229     @Test
230     public void testServiceApis() {
231         assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
232                         .anyMatch(obj -> obj instanceof DistributedLockManager));
233     }
234
235     @Test
236     public void testGetSequenceNumber() {
237         assertEquals(1000, feature.getSequenceNumber());
238     }
239
240     @Test
241     public void testBeforeCreateLockManager() {
242         assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
243     }
244
245     /**
246      * Tests beforeCreate(), when getProperties() throws a runtime exception.
247      */
248     @Test
249     public void testBeforeCreateLockManagerEx() {
250         shutdownFeature();
251
252         feature = new MyLockingFeature(false) {
253             @Override
254             protected Properties getProperties(String fileName) {
255                 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
256             }
257         };
258
259         assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties()))
260                         .isInstanceOf(DistributedLockManagerException.class);
261     }
262
263     @Test
264     public void testAfterStart() {
265         // verify that cleanup & expire check are both added to the queue
266         verify(exsvc).execute(any());
267         verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
268     }
269
270     /**
271      * Tests afterStart(), when thread pool throws a runtime exception.
272      */
273     @Test
274     public void testAfterStartExInThreadPool() {
275         shutdownFeature();
276
277         feature = new MyLockingFeature(false);
278
279         doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
280
281         assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
282     }
283
284     @Test
285     public void testDeleteExpiredDbLocks() throws SQLException {
286         // add records: two expired, one not
287         insertRecord(RESOURCE, feature.getUuidString(), -1);
288         insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
289         insertRecord(RESOURCE3, OTHER_OWNER, 0);
290         insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
291
292         // get the clean-up function and execute it
293         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
294         verify(exsvc).execute(captor.capture());
295
296         long tbegin = System.currentTimeMillis();
297         Runnable action = captor.getValue();
298         action.run();
299
300         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
301         assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
302         assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
303         assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
304
305         assertEquals(2, getRecordCount());
306     }
307
308     /**
309      * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
310      *
311      * @throws SQLException if an error occurs
312      */
313     @Test
314     public void testDeleteExpiredDbLocksEx() throws SQLException {
315         feature = new InvalidDbLockingFeature(TRANSIENT);
316
317         // get the clean-up function and execute it
318         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
319         verify(exsvc).execute(captor.capture());
320
321         Runnable action = captor.getValue();
322
323         // should not throw an exception
324         action.run();
325     }
326
327     @Test
328     public void testAfterStop() {
329         shutdownFeature();
330         verify(checker).cancel(anyBoolean());
331
332         feature = new DistributedLockManager();
333
334         // shutdown without calling afterStart()
335
336         shutdownFeature();
337     }
338
339     /**
340      * Tests afterStop(), when the data source throws an exception when close() is called.
341      *
342      * @throws SQLException if an error occurs
343      */
344     @Test
345     public void testAfterStopEx() throws SQLException {
346         shutdownFeature();
347
348         // use a data source that throws an exception when closed
349         feature = new InvalidDbLockingFeature(TRANSIENT);
350
351         shutdownFeature();
352     }
353
354     @Test
355     public void testCreateLock() throws SQLException {
356         verify(exsvc).execute(any());
357
358         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
359         assertTrue(lock.isWaiting());
360
361         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
362
363         // this lock should fail
364         LockCallback callback2 = mock(LockCallback.class);
365         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
366         assertTrue(lock2.isUnavailable());
367         verify(callback2, never()).lockAvailable(lock2);
368         verify(callback2).lockUnavailable(lock2);
369
370         // this should fail, too
371         LockCallback callback3 = mock(LockCallback.class);
372         DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
373         assertTrue(lock3.isUnavailable());
374         verify(callback3, never()).lockAvailable(lock3);
375         verify(callback3).lockUnavailable(lock3);
376
377         // no change to first
378         assertTrue(lock.isWaiting());
379
380         // no callbacks to the first lock
381         verify(callback, never()).lockAvailable(lock);
382         verify(callback, never()).lockUnavailable(lock);
383
384         assertTrue(lock.isWaiting());
385         assertEquals(0, getRecordCount());
386
387         runLock(0, 0);
388         assertTrue(lock.isActive());
389         assertEquals(1, getRecordCount());
390
391         verify(callback).lockAvailable(lock);
392         verify(callback, never()).lockUnavailable(lock);
393
394         // this should succeed
395         DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
396         assertTrue(lock4.isWaiting());
397
398         // after running checker, original records should still remain
399         runChecker(0, 0, EXPIRE_SEC);
400         assertEquals(1, getRecordCount());
401         verify(callback, never()).lockUnavailable(lock);
402     }
403
404     /**
405      * Tests createLock() when the feature is not the latest instance.
406      */
407     @Test
408     public void testCreateLockNotLatestInstance() {
409         DistributedLockManager.setLatestInstance(null);
410
411         Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
412         assertTrue(lock.isUnavailable());
413         verify(callback, never()).lockAvailable(any());
414         verify(callback).lockUnavailable(lock);
415     }
416
417     @Test
418     public void testCheckExpired() throws SQLException {
419         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
420         runLock(0, 0);
421
422         LockCallback callback2 = mock(LockCallback.class);
423         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
424         runLock(1, 0);
425
426         LockCallback callback3 = mock(LockCallback.class);
427         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
428         runLock(2, 0);
429
430         LockCallback callback4 = mock(LockCallback.class);
431         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
432         runLock(3, 0);
433
434         LockCallback callback5 = mock(LockCallback.class);
435         final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
436         runLock(4, 0);
437
438         assertEquals(5, getRecordCount());
439
440         // expire one record
441         updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
442
443         // change host of another record
444         updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
445
446         // change uuid of another record
447         updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC);
448
449
450         // run the checker
451         runChecker(0, 0, EXPIRE_SEC);
452
453
454         // check lock states
455         assertTrue(lock.isUnavailable());
456         assertTrue(lock2.isActive());
457         assertTrue(lock3.isUnavailable());
458         assertTrue(lock4.isActive());
459         assertTrue(lock5.isUnavailable());
460
461         // allow callbacks
462         runLock(2, 2);
463         runLock(3, 1);
464         runLock(4, 0);
465         verify(callback).lockUnavailable(lock);
466         verify(callback3).lockUnavailable(lock3);
467         verify(callback5).lockUnavailable(lock5);
468
469         verify(callback2, never()).lockUnavailable(lock2);
470         verify(callback4, never()).lockUnavailable(lock4);
471
472
473         // another check should have been scheduled, with the normal interval
474         runChecker(1, 0, EXPIRE_SEC);
475     }
476
477     /**
478      * Tests checkExpired(), when schedule() throws an exception.
479      */
480     @Test
481     public void testCheckExpiredExecRejected() {
482         // arrange for execution to be rejected
483         when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
484                         .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
485
486         runChecker(0, 0, EXPIRE_SEC);
487     }
488
489     /**
490      * Tests checkExpired(), when getConnection() throws an exception.
491      */
492     @Test
493     public void testCheckExpiredSqlEx() {
494         // use a data source that throws an exception when getConnection() is called
495         feature = new InvalidDbLockingFeature(TRANSIENT);
496
497         runChecker(0, 0, EXPIRE_SEC);
498
499         // it should have scheduled another check, sooner
500         runChecker(0, 0, RETRY_SEC);
501     }
502
503     /**
504      * Tests checkExpired(), when getConnection() throws an exception and the feature is
505      * no longer alive.
506      */
507     @Test
508     public void testCheckExpiredSqlExFeatureStopped() {
509         // use a data source that throws an exception when getConnection() is called
510         feature = new InvalidDbLockingFeature(TRANSIENT) {
511             @Override
512             protected SQLException makeEx() {
513                 this.stop();
514                 return super.makeEx();
515             }
516         };
517
518         runChecker(0, 0, EXPIRE_SEC);
519
520         // it should NOT have scheduled another check
521         verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
522     }
523
524     @Test
525     public void testExpireLocks() throws SQLException {
526         AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
527
528         feature = new MyLockingFeature(true) {
529             @Override
530             protected BasicDataSource makeDataSource() throws Exception {
531                 // get the real data source
532                 BasicDataSource src2 = super.makeDataSource();
533
534                 when(datasrc.getConnection()).thenAnswer(answer -> {
535                     DistributedLock lck = freeLock.getAndSet(null);
536                     if (lck != null) {
537                         // free it
538                         lck.free();
539
540                         // run its doUnlock
541                         runLock(4, 0);
542                     }
543
544                     return src2.getConnection();
545                 });
546
547                 return datasrc;
548             }
549         };
550
551         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
552         runLock(0, 0);
553
554         LockCallback callback2 = mock(LockCallback.class);
555         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
556         runLock(1, 0);
557
558         LockCallback callback3 = mock(LockCallback.class);
559         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
560         // don't run doLock for lock3 - leave it in the waiting state
561
562         LockCallback callback4 = mock(LockCallback.class);
563         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
564         runLock(3, 0);
565
566         assertEquals(3, getRecordCount());
567
568         // expire one record
569         updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
570
571         // arrange to free lock4 while the checker is running
572         freeLock.set(lock4);
573
574         // run the checker
575         runChecker(0, 0, EXPIRE_SEC);
576
577
578         // check lock states
579         assertTrue(lock.isUnavailable());
580         assertTrue(lock2.isActive());
581         assertTrue(lock3.isWaiting());
582         assertTrue(lock4.isUnavailable());
583
584         runLock(4, 0);
585         verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
586
587         verify(callback).lockUnavailable(lock);
588         verify(callback2, never()).lockUnavailable(lock2);
589         verify(callback3, never()).lockUnavailable(lock3);
590         verify(callback4, never()).lockUnavailable(lock4);
591     }
592
593     @Test
594     public void testDistributedLockNoArgs() {
595         DistributedLock lock = new DistributedLock();
596         assertNull(lock.getResourceId());
597         assertNull(lock.getOwnerKey());
598         assertNull(lock.getCallback());
599         assertEquals(0, lock.getHoldSec());
600     }
601
602     @Test
603     public void testDistributedLock() {
604         assertThatIllegalArgumentException()
605                         .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
606                         .withMessageContaining("holdSec");
607
608         // should generate no exception
609         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
610     }
611
612     @Test
613     public void testDistributedLockSerializable() throws Exception {
614         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
615         lock = roundTrip(lock);
616
617         assertTrue(lock.isWaiting());
618
619         assertEquals(RESOURCE, lock.getResourceId());
620         assertEquals(OWNER_KEY, lock.getOwnerKey());
621         assertNull(lock.getCallback());
622         assertEquals(HOLD_SEC, lock.getHoldSec());
623     }
624
625     @Test
626     public void testGrant() {
627         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
628         assertFalse(lock.isActive());
629
630         // execute the doLock() call
631         runLock(0, 0);
632
633         assertTrue(lock.isActive());
634
635         // the callback for the lock should have been run in the foreground thread
636         verify(callback).lockAvailable(lock);
637     }
638
639     @Test
640     public void testDistributedLockDeny() {
641         // get a lock
642         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
643
644         // get another lock - should fail
645         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
646
647         assertTrue(lock.isUnavailable());
648
649         // the callback for the second lock should have been run in the foreground thread
650         verify(callback).lockUnavailable(lock);
651
652         // should only have a request for the first lock
653         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
654     }
655
656     @Test
657     public void testDistributedLockFree() {
658         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
659
660         assertTrue(lock.free());
661         assertTrue(lock.isUnavailable());
662
663         // run both requests associated with the lock
664         runLock(0, 1);
665         runLock(1, 0);
666
667         // should not have changed state
668         assertTrue(lock.isUnavailable());
669
670         // attempt to free it again
671         assertFalse(lock.free());
672
673         // should not have queued anything else
674         verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
675
676         // new lock should succeed
677         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
678         assertTrue(lock2 != lock);
679         assertTrue(lock2.isWaiting());
680     }
681
682     /**
683      * Tests that free() works on a serialized lock with a new feature.
684      *
685      * @throws Exception if an error occurs
686      */
687     @Test
688     public void testDistributedLockFreeSerialized() throws Exception {
689         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
690
691         feature = new MyLockingFeature(true);
692
693         lock = roundTrip(lock);
694         assertTrue(lock.free());
695         assertTrue(lock.isUnavailable());
696     }
697
698     /**
699      * Tests free() on a serialized lock without a feature.
700      *
701      * @throws Exception if an error occurs
702      */
703     @Test
704     public void testDistributedLockFreeNoFeature() throws Exception {
705         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
706
707         DistributedLockManager.setLatestInstance(null);
708
709         lock = roundTrip(lock);
710         assertFalse(lock.free());
711         assertTrue(lock.isUnavailable());
712     }
713
714     /**
715      * Tests the case where the lock is freed and doUnlock called between the call to
716      * isUnavailable() and the call to compute().
717      */
718     @Test
719     public void testDistributedLockFreeUnlocked() {
720         feature = new FreeWithFreeLockingFeature(true);
721
722         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
723
724         assertFalse(lock.free());
725         assertTrue(lock.isUnavailable());
726     }
727
728     /**
729      * Tests the case where the lock is freed, but doUnlock is not completed, between the
730      * call to isUnavailable() and the call to compute().
731      */
732     @Test
733     public void testDistributedLockFreeLockFreed() {
734         feature = new FreeWithFreeLockingFeature(false);
735
736         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
737
738         assertFalse(lock.free());
739         assertTrue(lock.isUnavailable());
740     }
741
742     @Test
743     public void testDistributedLockExtend() {
744         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
745
746         // lock2 should be denied - called back by this thread
747         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
748         verify(callback, never()).lockAvailable(lock2);
749         verify(callback).lockUnavailable(lock2);
750
751         // lock2 will still be denied - called back by this thread
752         lock2.extend(HOLD_SEC, callback);
753         verify(callback, times(2)).lockUnavailable(lock2);
754
755         // force lock2 to be active - should still be denied
756         Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
757         lock2.extend(HOLD_SEC, callback);
758         verify(callback, times(3)).lockUnavailable(lock2);
759
760         assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
761                         .withMessageContaining("holdSec");
762
763         // execute doLock()
764         runLock(0, 0);
765         assertTrue(lock.isActive());
766
767         // now extend the first lock
768         LockCallback callback2 = mock(LockCallback.class);
769         lock.extend(HOLD_SEC2, callback2);
770         assertTrue(lock.isWaiting());
771
772         // execute doExtend()
773         runLock(1, 0);
774         lock.extend(HOLD_SEC2, callback2);
775         assertEquals(HOLD_SEC2, lock.getHoldSec());
776         verify(callback2).lockAvailable(lock);
777         verify(callback2, never()).lockUnavailable(lock);
778     }
779
780     /**
781      * Tests that extend() works on a serialized lock with a new feature.
782      *
783      * @throws Exception if an error occurs
784      */
785     @Test
786     public void testDistributedLockExtendSerialized() throws Exception {
787         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
788
789         // run doLock
790         runLock(0, 0);
791         assertTrue(lock.isActive());
792
793         feature = new MyLockingFeature(true);
794
795         lock = roundTrip(lock);
796         assertTrue(lock.isActive());
797
798         LockCallback scallback = mock(LockCallback.class);
799
800         lock.extend(HOLD_SEC, scallback);
801         assertTrue(lock.isWaiting());
802
803         // run doExtend (in new feature)
804         runLock(0, 0);
805         assertTrue(lock.isActive());
806
807         verify(scallback).lockAvailable(lock);
808         verify(scallback, never()).lockUnavailable(lock);
809     }
810
811     /**
812      * Tests extend() on a serialized lock without a feature.
813      *
814      * @throws Exception if an error occurs
815      */
816     @Test
817     public void testDistributedLockExtendNoFeature() throws Exception {
818         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
819
820         // run doLock
821         runLock(0, 0);
822         assertTrue(lock.isActive());
823
824         DistributedLockManager.setLatestInstance(null);
825
826         lock = roundTrip(lock);
827         assertTrue(lock.isActive());
828
829         LockCallback scallback = mock(LockCallback.class);
830
831         lock.extend(HOLD_SEC, scallback);
832         assertTrue(lock.isUnavailable());
833
834         verify(scallback, never()).lockAvailable(lock);
835         verify(scallback).lockUnavailable(lock);
836     }
837
838     /**
839      * Tests the case where the lock is freed and doUnlock called between the call to
840      * isUnavailable() and the call to compute().
841      */
842     @Test
843     public void testDistributedLockExtendUnlocked() {
844         feature = new FreeWithFreeLockingFeature(true);
845
846         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
847
848         lock.extend(HOLD_SEC2, callback);
849         assertTrue(lock.isUnavailable());
850         verify(callback).lockUnavailable(lock);
851     }
852
853     /**
854      * Tests the case where the lock is freed, but doUnlock is not completed, between the
855      * call to isUnavailable() and the call to compute().
856      */
857     @Test
858     public void testDistributedLockExtendLockFreed() {
859         feature = new FreeWithFreeLockingFeature(false);
860
861         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
862
863         lock.extend(HOLD_SEC2, callback);
864         assertTrue(lock.isUnavailable());
865         verify(callback).lockUnavailable(lock);
866     }
867
868     @Test
869     public void testDistributedLockScheduleRequest() {
870         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
871         runLock(0, 0);
872
873         verify(callback).lockAvailable(lock);
874     }
875
876     @Test
877     public void testDistributedLockRescheduleRequest() throws SQLException {
878         // use a data source that throws an exception when getConnection() is called
879         InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
880         feature = invfeat;
881
882         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
883
884         // invoke doLock - should fail and reschedule
885         runLock(0, 0);
886
887         // should still be waiting
888         assertTrue(lock.isWaiting());
889         verify(callback, never()).lockUnavailable(lock);
890
891         // free the lock while doLock is executing
892         invfeat.freeLock = true;
893
894         // try scheduled request - should just invoke doUnlock
895         runSchedule(0, 0);
896
897         // should still be waiting
898         assertTrue(lock.isUnavailable());
899         verify(callback, never()).lockUnavailable(lock);
900
901         // should have scheduled a retry of doUnlock
902         verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
903     }
904
905     @Test
906     public void testDistributedLockGetNextRequest() {
907         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
908
909         /*
910          * run doLock. This should cause getNextRequest() to be called twice, once with a
911          * request in the queue, and the second time with request=null.
912          */
913         runLock(0, 0);
914     }
915
916     /**
917      * Tests getNextRequest(), where the same request is still in the queue the second
918      * time it's called.
919      */
920     @Test
921     public void testDistributedLockGetNextRequestSameRequest() {
922         // force reschedule to be invoked
923         feature = new InvalidDbLockingFeature(TRANSIENT);
924
925         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
926
927         /*
928          * run doLock. This should cause getNextRequest() to be called twice, once with a
929          * request in the queue, and the second time with the same request again.
930          */
931         runLock(0, 0);
932
933         verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
934     }
935
936     @Test
937     public void testDistributedLockDoRequest() throws SQLException {
938         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
939
940         assertTrue(lock.isWaiting());
941
942         // run doLock via doRequest
943         runLock(0, 0);
944
945         assertTrue(lock.isActive());
946     }
947
948     /**
949      * Tests doRequest(), when doRequest() is already running within another thread.
950      */
951     @Test
952     public void testDistributedLockDoRequestBusy() {
953         /*
954          * this feature will invoke a request in a background thread while it's being run
955          * in a foreground thread.
956          */
957         AtomicBoolean running = new AtomicBoolean(false);
958         AtomicBoolean returned = new AtomicBoolean(false);
959
960         feature = new MyLockingFeature(true) {
961             @Override
962             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
963                             LockCallback callback) {
964                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
965                     private static final long serialVersionUID = 1L;
966
967                     @Override
968                     protected boolean doDbInsert(Connection conn) throws SQLException {
969                         if (running.get()) {
970                             // already inside the thread - don't recurse any further
971                             return super.doDbInsert(conn);
972                         }
973
974                         running.set(true);
975
976                         Thread thread = new Thread(() -> {
977                             // run doLock from within the new thread
978                             runLock(0, 0);
979                         });
980                         thread.setDaemon(true);
981                         thread.start();
982
983                         // wait for the background thread to complete before continuing
984                         try {
985                             thread.join(5000);
986                         } catch (InterruptedException ignore) {
987                             Thread.currentThread().interrupt();
988                         }
989
990                         returned.set(!thread.isAlive());
991
992                         return super.doDbInsert(conn);
993                     }
994                 };
995             }
996         };
997
998         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
999
1000         // run doLock
1001         runLock(0, 0);
1002
1003         assertTrue(returned.get());
1004     }
1005
1006     /**
1007      * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1008      *
1009      * @throws SQLException if an error occurs
1010      */
1011     @Test
1012     public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1013         // throw run-time exception
1014         when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1015
1016         // use a data source that throws an exception when getConnection() is called
1017         feature = new MyLockingFeature(true) {
1018             @Override
1019             protected BasicDataSource makeDataSource() throws Exception {
1020                 return datasrc;
1021             }
1022         };
1023
1024         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1025
1026         // invoke doLock - should NOT reschedule
1027         runLock(0, 0);
1028
1029         assertTrue(lock.isUnavailable());
1030         verify(callback).lockUnavailable(lock);
1031
1032         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1033     }
1034
1035     /**
1036      * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1037      * state.
1038      *
1039      * @throws SQLException if an error occurs
1040      */
1041     @Test
1042     public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1043         // throw run-time exception
1044         when(datasrc.getConnection()).thenAnswer(answer -> {
1045             lock.free();
1046             throw new IllegalStateException(EXPECTED_EXCEPTION);
1047         });
1048
1049         // use a data source that throws an exception when getConnection() is called
1050         feature = new MyLockingFeature(true) {
1051             @Override
1052             protected BasicDataSource makeDataSource() throws Exception {
1053                 return datasrc;
1054             }
1055         };
1056
1057         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1058
1059         // invoke doLock - should NOT reschedule
1060         runLock(0, 0);
1061
1062         assertTrue(lock.isUnavailable());
1063         verify(callback, never()).lockUnavailable(lock);
1064
1065         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1066     }
1067
1068     /**
1069      * Tests doRequest() when the retry count gets exhausted.
1070      */
1071     @Test
1072     public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1073         // use a data source that throws an exception when getConnection() is called
1074         feature = new InvalidDbLockingFeature(TRANSIENT);
1075
1076         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1077
1078         // invoke doLock - should fail and reschedule
1079         runLock(0, 0);
1080
1081         // should still be waiting
1082         assertTrue(lock.isWaiting());
1083         verify(callback, never()).lockUnavailable(lock);
1084
1085         // try again, via SCHEDULER - first retry fails
1086         runSchedule(0, 0);
1087
1088         // should still be waiting
1089         assertTrue(lock.isWaiting());
1090         verify(callback, never()).lockUnavailable(lock);
1091
1092         // try again, via SCHEDULER - final retry fails
1093         runSchedule(1, 0);
1094         assertTrue(lock.isUnavailable());
1095
1096         // now callback should have been called
1097         verify(callback).lockUnavailable(lock);
1098     }
1099
1100     /**
1101      * Tests doRequest() when a non-transient DB exception is thrown.
1102      */
1103     @Test
1104     public void testDistributedLockDoRequestNotTransient() {
1105         /*
1106          * use a data source that throws a PERMANENT exception when getConnection() is
1107          * called
1108          */
1109         feature = new InvalidDbLockingFeature(PERMANENT);
1110
1111         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1112
1113         // invoke doLock - should fail
1114         runLock(0, 0);
1115
1116         assertTrue(lock.isUnavailable());
1117         verify(callback).lockUnavailable(lock);
1118
1119         // should not have scheduled anything new
1120         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1121         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1122     }
1123
1124     @Test
1125     public void testDistributedLockDoLock() throws SQLException {
1126         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1127
1128         // invoke doLock - should simply do an insert
1129         long tbegin = System.currentTimeMillis();
1130         runLock(0, 0);
1131
1132         assertEquals(1, getRecordCount());
1133         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1134         verify(callback).lockAvailable(lock);
1135     }
1136
1137     /**
1138      * Tests doLock() when the lock is freed before doLock runs.
1139      *
1140      * @throws SQLException if an error occurs
1141      */
1142     @Test
1143     public void testDistributedLockDoLockFreed() throws SQLException {
1144         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1145
1146         lock.setState(LockState.UNAVAILABLE);
1147
1148         // invoke doLock - should do nothing
1149         runLock(0, 0);
1150
1151         assertEquals(0, getRecordCount());
1152
1153         verify(callback, never()).lockAvailable(lock);
1154     }
1155
1156     /**
1157      * Tests doLock() when a DB exception is thrown.
1158      */
1159     @Test
1160     public void testDistributedLockDoLockEx() {
1161         // use a data source that throws an exception when getConnection() is called
1162         feature = new InvalidDbLockingFeature(PERMANENT);
1163
1164         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1165
1166         // invoke doLock - should simply do an insert
1167         runLock(0, 0);
1168
1169         // lock should have failed due to exception
1170         verify(callback).lockUnavailable(lock);
1171     }
1172
1173     /**
1174      * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1175      * to be called.
1176      */
1177     @Test
1178     public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1179         // insert an expired record
1180         insertRecord(RESOURCE, feature.getUuidString(), 0);
1181
1182         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1183
1184         // invoke doLock - should simply do an update
1185         runLock(0, 0);
1186         verify(callback).lockAvailable(lock);
1187     }
1188
1189     /**
1190      * Tests doLock() when a locked record already exists.
1191      */
1192     @Test
1193     public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1194         // insert an expired record
1195         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1196
1197         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1198
1199         // invoke doLock
1200         runLock(0, 0);
1201
1202         // lock should have failed because it's already locked
1203         verify(callback).lockUnavailable(lock);
1204     }
1205
1206     @Test
1207     public void testDistributedLockDoUnlock() throws SQLException {
1208         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1209
1210         // invoke doLock()
1211         runLock(0, 0);
1212
1213         lock.free();
1214
1215         // invoke doUnlock()
1216         long tbegin = System.currentTimeMillis();
1217         runLock(1, 0);
1218
1219         assertEquals(0, getRecordCount());
1220         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1221
1222         assertTrue(lock.isUnavailable());
1223
1224         // no more callbacks should have occurred
1225         verify(callback, times(1)).lockAvailable(lock);
1226         verify(callback, never()).lockUnavailable(lock);
1227     }
1228
1229     /**
1230      * Tests doUnlock() when a DB exception is thrown.
1231      *
1232      * @throws SQLException if an error occurs
1233      */
1234     @Test
1235     public void testDistributedLockDoUnlockEx() throws SQLException {
1236         feature = new InvalidDbLockingFeature(PERMANENT);
1237
1238         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1239
1240         // do NOT invoke doLock() - it will fail without a DB connection
1241
1242         lock.free();
1243
1244         // invoke doUnlock()
1245         runLock(1, 0);
1246
1247         assertTrue(lock.isUnavailable());
1248
1249         // no more callbacks should have occurred
1250         verify(callback, never()).lockAvailable(lock);
1251         verify(callback, never()).lockUnavailable(lock);
1252     }
1253
1254     @Test
1255     public void testDistributedLockDoExtend() throws SQLException {
1256         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1257         runLock(0, 0);
1258
1259         LockCallback callback2 = mock(LockCallback.class);
1260         lock.extend(HOLD_SEC2, callback2);
1261
1262         // call doExtend()
1263         long tbegin = System.currentTimeMillis();
1264         runLock(1, 0);
1265
1266         assertEquals(1, getRecordCount());
1267         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1268
1269         assertTrue(lock.isActive());
1270
1271         // no more callbacks should have occurred
1272         verify(callback).lockAvailable(lock);
1273         verify(callback, never()).lockUnavailable(lock);
1274
1275         // extension should have succeeded
1276         verify(callback2).lockAvailable(lock);
1277         verify(callback2, never()).lockUnavailable(lock);
1278     }
1279
1280     /**
1281      * Tests doExtend() when the lock is freed before doExtend runs.
1282      *
1283      * @throws SQLException if an error occurs
1284      */
1285     @Test
1286     public void testDistributedLockDoExtendFreed() throws SQLException {
1287         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1288         lock.extend(HOLD_SEC2, callback);
1289
1290         lock.setState(LockState.UNAVAILABLE);
1291
1292         // invoke doExtend - should do nothing
1293         runLock(1, 0);
1294
1295         assertEquals(0, getRecordCount());
1296
1297         verify(callback, never()).lockAvailable(lock);
1298     }
1299
1300     /**
1301      * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1302      * insert.
1303      *
1304      * @throws SQLException if an error occurs
1305      */
1306     @Test
1307     public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1308         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1309         runLock(0, 0);
1310
1311         LockCallback callback2 = mock(LockCallback.class);
1312         lock.extend(HOLD_SEC2, callback2);
1313
1314         // delete the record so it's forced to re-insert it
1315         cleanDb();
1316
1317         // call doExtend()
1318         long tbegin = System.currentTimeMillis();
1319         runLock(1, 0);
1320
1321         assertEquals(1, getRecordCount());
1322         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1323
1324         assertTrue(lock.isActive());
1325
1326         // no more callbacks should have occurred
1327         verify(callback).lockAvailable(lock);
1328         verify(callback, never()).lockUnavailable(lock);
1329
1330         // extension should have succeeded
1331         verify(callback2).lockAvailable(lock);
1332         verify(callback2, never()).lockUnavailable(lock);
1333     }
1334
1335     /**
1336      * Tests doExtend() when both update and insert fail.
1337      *
1338      * @throws SQLException if an error occurs
1339      */
1340     @Test
1341     public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
1342         /*
1343          * this feature will create a lock that returns false when doDbUpdate() is
1344          * invoked, or when doDbInsert() is invoked a second time
1345          */
1346         feature = new MyLockingFeature(true) {
1347             @Override
1348             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1349                             LockCallback callback) {
1350                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1351                     private static final long serialVersionUID = 1L;
1352                     private int ntimes = 0;
1353
1354                     @Override
1355                     protected boolean doDbInsert(Connection conn) throws SQLException {
1356                         if (ntimes++ > 0) {
1357                             return false;
1358                         }
1359
1360                         return super.doDbInsert(conn);
1361                     }
1362
1363                     @Override
1364                     protected boolean doDbUpdate(Connection conn) {
1365                         return false;
1366                     }
1367                 };
1368             }
1369         };
1370
1371         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1372         runLock(0, 0);
1373
1374         LockCallback callback2 = mock(LockCallback.class);
1375         lock.extend(HOLD_SEC2, callback2);
1376
1377         // call doExtend()
1378         runLock(1, 0);
1379
1380         assertTrue(lock.isUnavailable());
1381
1382         // no more callbacks should have occurred
1383         verify(callback).lockAvailable(lock);
1384         verify(callback, never()).lockUnavailable(lock);
1385
1386         // extension should have failed
1387         verify(callback2, never()).lockAvailable(lock);
1388         verify(callback2).lockUnavailable(lock);
1389     }
1390
1391     /**
1392      * Tests doExtend() when an exception occurs.
1393      *
1394      * @throws SQLException if an error occurs
1395      */
1396     @Test
1397     public void testDistributedLockDoExtendEx() throws SQLException {
1398         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1399         runLock(0, 0);
1400
1401         /*
1402          * delete the record and insert one with a different owner, which will cause
1403          * doDbInsert() to throw an exception
1404          */
1405         cleanDb();
1406         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1407
1408         LockCallback callback2 = mock(LockCallback.class);
1409         lock.extend(HOLD_SEC2, callback2);
1410
1411         // call doExtend()
1412         runLock(1, 0);
1413
1414         assertTrue(lock.isUnavailable());
1415
1416         // no more callbacks should have occurred
1417         verify(callback).lockAvailable(lock);
1418         verify(callback, never()).lockUnavailable(lock);
1419
1420         // extension should have failed
1421         verify(callback2, never()).lockAvailable(lock);
1422         verify(callback2).lockUnavailable(lock);
1423     }
1424
1425     @Test
1426     public void testDistributedLockToString() {
1427         String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
1428         assertNotNull(text);
1429         assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1430     }
1431
1432     @Test
1433     public void testMakeThreadPool() {
1434         // use a REAL feature to test this
1435         feature = new DistributedLockManager();
1436
1437         // this should create a thread pool
1438         feature.beforeCreateLockManager(engine, new Properties());
1439         feature.afterStart(engine);
1440
1441         shutdownFeature();
1442     }
1443
1444     /**
1445      * Performs a multi-threaded test of the locking facility.
1446      *
1447      * @throws InterruptedException if the current thread is interrupted while waiting for
1448      *         the background threads to complete
1449      */
1450     @Test
1451     public void testMultiThreaded() throws InterruptedException {
1452         Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1453
1454         feature = new DistributedLockManager();
1455         feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1456         feature.afterStart(PolicyEngineConstants.getManager());
1457
1458         List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1459         for (int x = 0; x < MAX_THREADS; ++x) {
1460             threads.add(new MyThread());
1461         }
1462
1463         threads.forEach(Thread::start);
1464
1465         for (MyThread thread : threads) {
1466             thread.join(6000);
1467             assertFalse(thread.isAlive());
1468         }
1469
1470         for (MyThread thread : threads) {
1471             if (thread.err != null) {
1472                 throw thread.err;
1473             }
1474         }
1475
1476         assertTrue(nsuccesses.get() > 0);
1477     }
1478
1479     private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
1480                     boolean waitForLock) {
1481         return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
1482     }
1483
1484     private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1485         ByteArrayOutputStream baos = new ByteArrayOutputStream();
1486         try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1487             oos.writeObject(lock);
1488         }
1489
1490         ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1491         try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1492             return (DistributedLock) ois.readObject();
1493         }
1494     }
1495
1496     /**
1497      * Runs the checkExpired() action.
1498      *
1499      * @param nskip number of actions in the work queue to skip
1500      * @param nadditional number of additional actions that appear in the work queue
1501      *        <i>after</i> the checkExpired action
1502      * @param schedSec number of seconds for which the checker should have been scheduled
1503      */
1504     private void runChecker(int nskip, int nadditional, long schedSec) {
1505         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1506         verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1507         Runnable action = captor.getAllValues().get(nskip);
1508         action.run();
1509     }
1510
1511     /**
1512      * Runs a lock action (e.g., doLock, doUnlock).
1513      *
1514      * @param nskip number of actions in the work queue to skip
1515      * @param nadditional number of additional actions that appear in the work queue
1516      *        <i>after</i> the desired action
1517      */
1518     void runLock(int nskip, int nadditional) {
1519         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1520         verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1521
1522         Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1523         action.run();
1524     }
1525
1526     /**
1527      * Runs a scheduled action (e.g., "retry" action).
1528      *
1529      * @param nskip number of actions in the work queue to skip
1530      * @param nadditional number of additional actions that appear in the work queue
1531      *        <i>after</i> the desired action
1532      */
1533     void runSchedule(int nskip, int nadditional) {
1534         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1535         verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
1536
1537         Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1538         action.run();
1539     }
1540
1541     /**
1542      * Gets a count of the number of lock records in the DB.
1543      *
1544      * @return the number of lock records in the DB
1545      * @throws SQLException if an error occurs accessing the DB
1546      */
1547     private int getRecordCount() throws SQLException {
1548         try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1549                         ResultSet result = stmt.executeQuery()) {
1550
1551             if (result.next()) {
1552                 return result.getInt(1);
1553
1554             } else {
1555                 return 0;
1556             }
1557         }
1558     }
1559
1560     /**
1561      * Determines if there is a record for the given resource whose expiration time is in
1562      * the expected range.
1563      *
1564      * @param resourceId ID of the resource of interest
1565      * @param uuidString UUID string of the owner
1566      * @param holdSec seconds for which the lock was to be held
1567      * @param tbegin earliest time, in milliseconds, at which the record could have been
1568      *        inserted into the DB
1569      * @return {@code true} if a record is found, {@code false} otherwise
1570      * @throws SQLException if an error occurs accessing the DB
1571      */
1572     private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1573         try (PreparedStatement stmt =
1574                         conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1575                                         + " WHERE resourceId=? AND host=? AND owner=?")) {
1576
1577             stmt.setString(1, resourceId);
1578             stmt.setString(2, feature.getHostName());
1579             stmt.setString(3, uuidString);
1580
1581             try (ResultSet result = stmt.executeQuery()) {
1582                 if (result.next()) {
1583                     int remaining = result.getInt(1);
1584                     long maxDiff = System.currentTimeMillis() - tbegin;
1585                     return (remaining >= 0 && holdSec - remaining <= maxDiff);
1586
1587                 } else {
1588                     return false;
1589                 }
1590             }
1591         }
1592     }
1593
1594     /**
1595      * Inserts a record into the DB.
1596      *
1597      * @param resourceId ID of the resource of interest
1598      * @param uuidString UUID string of the owner
1599      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1600      * @throws SQLException if an error occurs accessing the DB
1601      */
1602     private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1603         this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset);
1604     }
1605
1606     private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1607                     throws SQLException {
1608         try (PreparedStatement stmt =
1609                         conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1610                                         + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1611
1612             stmt.setString(1, resourceId);
1613             stmt.setString(2, hostName);
1614             stmt.setString(3, uuidString);
1615             stmt.setInt(4, expireOffset);
1616
1617             assertEquals(1, stmt.executeUpdate());
1618         }
1619     }
1620
1621     /**
1622      * Updates a record in the DB.
1623      *
1624      * @param resourceId ID of the resource of interest
1625      * @param newUuid UUID string of the <i>new</i> owner
1626      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1627      * @throws SQLException if an error occurs accessing the DB
1628      */
1629     private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1630         try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1631                         + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1632
1633             stmt.setString(1, newHost);
1634             stmt.setString(2, newUuid);
1635             stmt.setInt(3, expireOffset);
1636             stmt.setString(4, resourceId);
1637
1638             assertEquals(1, stmt.executeUpdate());
1639         }
1640     }
1641
1642     /**
1643      * Feature that uses <i>exsvc</i> to execute requests.
1644      */
1645     private class MyLockingFeature extends DistributedLockManager {
1646
1647         public MyLockingFeature(boolean init) {
1648             shutdownFeature();
1649
1650             exsvc = mock(ScheduledExecutorService.class);
1651             when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1652             Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1653
1654             if (init) {
1655                 beforeCreateLockManager(engine, new Properties());
1656                 start();
1657                 afterStart(engine);
1658             }
1659         }
1660     }
1661
1662     /**
1663      * Feature whose data source all throws exceptions.
1664      */
1665     private class InvalidDbLockingFeature extends MyLockingFeature {
1666         private boolean isTransient;
1667         private boolean freeLock = false;
1668
1669         public InvalidDbLockingFeature(boolean isTransient) {
1670             // pass "false" because we have to set the error code BEFORE calling
1671             // afterStart()
1672             super(false);
1673
1674             this.isTransient = isTransient;
1675
1676             this.beforeCreateLockManager(engine, new Properties());
1677             this.start();
1678             this.afterStart(engine);
1679         }
1680
1681         @Override
1682         protected BasicDataSource makeDataSource() throws Exception {
1683             when(datasrc.getConnection()).thenAnswer(answer -> {
1684                 if (freeLock) {
1685                     freeLock = false;
1686                     lock.free();
1687                 }
1688
1689                 throw makeEx();
1690             });
1691
1692             doThrow(makeEx()).when(datasrc).close();
1693
1694             return datasrc;
1695         }
1696
1697         protected SQLException makeEx() {
1698             if (isTransient) {
1699                 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1700
1701             } else {
1702                 return new SQLException(EXPECTED_EXCEPTION);
1703             }
1704         }
1705     }
1706
1707     /**
1708      * Feature whose locks free themselves while free() is already running.
1709      */
1710     private class FreeWithFreeLockingFeature extends MyLockingFeature {
1711         private boolean relock;
1712
1713         public FreeWithFreeLockingFeature(boolean relock) {
1714             super(true);
1715             this.relock = relock;
1716         }
1717
1718         @Override
1719         protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1720                         LockCallback callback) {
1721
1722             return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1723                 private static final long serialVersionUID = 1L;
1724                 private boolean checked = false;
1725
1726                 @Override
1727                 public boolean isUnavailable() {
1728                     if (checked) {
1729                         return super.isUnavailable();
1730                     }
1731
1732                     checked = true;
1733
1734                     // release and relock
1735                     free();
1736
1737                     if (relock) {
1738                         // run doUnlock
1739                         runLock(1, 0);
1740
1741                         // relock it
1742                         createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1743                     }
1744
1745                     return false;
1746                 }
1747             };
1748         }
1749     }
1750
1751     /**
1752      * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
1753      * extend it, and then unlock it.
1754      */
1755     private class MyThread extends Thread {
1756         AssertionError err = null;
1757
1758         public MyThread() {
1759             setDaemon(true);
1760         }
1761
1762         @Override
1763         public void run() {
1764             try {
1765                 for (int x = 0; x < MAX_LOOPS; ++x) {
1766                     makeAttempt();
1767                 }
1768
1769             } catch (AssertionError e) {
1770                 err = e;
1771             }
1772         }
1773
1774         private void makeAttempt() {
1775             try {
1776                 Semaphore sem = new Semaphore(0);
1777
1778                 LockCallback cb = new LockCallback() {
1779                     @Override
1780                     public void lockAvailable(Lock lock) {
1781                         sem.release();
1782                     }
1783
1784                     @Override
1785                     public void lockUnavailable(Lock lock) {
1786                         sem.release();
1787                     }
1788                 };
1789
1790                 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1791
1792                 // wait for callback, whether available or unavailable
1793                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1794                 if (!lock.isActive()) {
1795                     return;
1796                 }
1797
1798                 nsuccesses.incrementAndGet();
1799
1800                 assertEquals(1, nactive.incrementAndGet());
1801
1802                 lock.extend(HOLD_SEC2, cb);
1803                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1804                 assertTrue(lock.isActive());
1805
1806                 // decrement BEFORE free()
1807                 nactive.decrementAndGet();
1808
1809                 assertTrue(lock.free());
1810                 assertTrue(lock.isUnavailable());
1811
1812             } catch (InterruptedException e) {
1813                 Thread.currentThread().interrupt();
1814                 throw new AssertionError("interrupted", e);
1815             }
1816         }
1817     }
1818 }