59b562246f7d29b95dc02260206068505e080884
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.distributed.locking;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
32 import static org.mockito.Matchers.any;
33 import static org.mockito.Matchers.anyLong;
34 import static org.mockito.Matchers.eq;
35 import static org.mockito.Mockito.doThrow;
36 import static org.mockito.Mockito.mock;
37 import static org.mockito.Mockito.never;
38 import static org.mockito.Mockito.times;
39 import static org.mockito.Mockito.verify;
40 import static org.mockito.Mockito.when;
41
42 import java.io.ByteArrayInputStream;
43 import java.io.ByteArrayOutputStream;
44 import java.io.ObjectInputStream;
45 import java.io.ObjectOutputStream;
46 import java.sql.Connection;
47 import java.sql.DriverManager;
48 import java.sql.PreparedStatement;
49 import java.sql.ResultSet;
50 import java.sql.SQLException;
51 import java.util.ArrayList;
52 import java.util.List;
53 import java.util.Properties;
54 import java.util.concurrent.Executors;
55 import java.util.concurrent.RejectedExecutionException;
56 import java.util.concurrent.ScheduledExecutorService;
57 import java.util.concurrent.Semaphore;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.atomic.AtomicBoolean;
60 import java.util.concurrent.atomic.AtomicInteger;
61 import java.util.concurrent.atomic.AtomicReference;
62 import org.apache.commons.dbcp2.BasicDataSource;
63 import org.junit.After;
64 import org.junit.AfterClass;
65 import org.junit.Before;
66 import org.junit.BeforeClass;
67 import org.junit.Test;
68 import org.mockito.ArgumentCaptor;
69 import org.mockito.Mock;
70 import org.mockito.MockitoAnnotations;
71 import org.onap.policy.common.utils.services.OrderedServiceImpl;
72 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
73 import org.onap.policy.drools.core.lock.Lock;
74 import org.onap.policy.drools.core.lock.LockCallback;
75 import org.onap.policy.drools.core.lock.LockState;
76 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
77 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
78 import org.onap.policy.drools.system.PolicyEngine;
79 import org.onap.policy.drools.system.PolicyEngineConstants;
80 import org.powermock.reflect.Whitebox;
81
82 public class DistributedLockManagerTest {
83     private static final long EXPIRE_SEC = 900L;
84     private static final long RETRY_SEC = 60L;
85     private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
86     private static final String OTHER_HOST = "other-host";
87     private static final String OTHER_OWNER = "other-owner";
88     private static final String EXPECTED_EXCEPTION = "expected exception";
89     private static final String DB_CONNECTION =
90                     "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
91     private static final String DB_USER = "user";
92     private static final String DB_PASSWORD = "password";
93     private static final String OWNER_KEY = "my key";
94     private static final String RESOURCE = "my resource";
95     private static final String RESOURCE2 = "my resource #2";
96     private static final String RESOURCE3 = "my resource #3";
97     private static final String RESOURCE4 = "my resource #4";
98     private static final String RESOURCE5 = "my resource #5";
99     private static final int HOLD_SEC = 100;
100     private static final int HOLD_SEC2 = 120;
101     private static final int MAX_THREADS = 5;
102     private static final int MAX_LOOPS = 100;
103     private static final int TRANSIENT = 500;
104     private static final int PERMANENT = 600;
105
106     // number of execute() calls before the first lock attempt
107     private static final int PRE_LOCK_EXECS = 1;
108
109     // number of execute() calls before the first schedule attempt
110     private static final int PRE_SCHED_EXECS = 1;
111
112     private static Connection conn = null;
113     private static ScheduledExecutorService saveExec;
114     private static ScheduledExecutorService realExec;
115
116     @Mock
117     private ScheduledExecutorService exsvc;
118
119     @Mock
120     private LockCallback callback;
121
122     @Mock
123     private BasicDataSource datasrc;
124
125     @Mock
126     private PolicyEngine engine;
127
128     private DistributedLock lock;
129
130     private AtomicInteger nactive;
131     private AtomicInteger nsuccesses;
132     private DistributedLockManager feature;
133
134
135     /**
136      * Configures the location of the property files and creates the DB.
137      *
138      * @throws SQLException if the DB cannot be created
139      */
140     @BeforeClass
141     public static void setUpBeforeClass() throws SQLException {
142         SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
143
144         conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
145
146         try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
147                         + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
148                         + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
149             createStmt.executeUpdate();
150         }
151
152         saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
153
154         realExec = Executors.newScheduledThreadPool(3);
155         Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
156     }
157
158     /**
159      * Restores static fields.
160      */
161     @AfterClass
162     public static void tearDownAfterClass() throws SQLException {
163         Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
164         realExec.shutdown();
165         conn.close();
166     }
167
168     /**
169      * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
170      * tasks.
171      *
172      * @throws SQLException if the lock records cannot be deleted from the DB
173      */
174     @Before
175     public void setUp() throws SQLException {
176         MockitoAnnotations.initMocks(this);
177
178         nactive = new AtomicInteger(0);
179         nsuccesses = new AtomicInteger(0);
180
181         cleanDb();
182
183         when(engine.getExecutorService()).thenReturn(exsvc);
184
185         feature = new MyLockingFeature(true);
186     }
187
188     @After
189     public void tearDown() throws SQLException {
190         shutdownFeature();
191         cleanDb();
192     }
193
194     private void cleanDb() throws SQLException {
195         try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
196             stmt.executeUpdate();
197         }
198     }
199
200     private void shutdownFeature() {
201         if (feature != null) {
202             feature.afterStop(engine);
203             feature = null;
204         }
205     }
206
207     /**
208      * Tests that the feature is found in the expected service sets.
209      */
210     @Test
211     public void testServiceApis() {
212         assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
213                         .anyMatch(obj -> obj instanceof DistributedLockManager));
214     }
215
216     /**
217      * Tests constructor() when properties are invalid.
218      */
219     @Test
220     public void testDistributedLockManagerInvalidProperties() {
221         // use properties containing an invalid value
222         Properties props = new Properties();
223         props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "abc");
224
225         feature = new MyLockingFeature(false) {
226             @Override
227             protected Properties getProperties(String fileName) {
228                 return props;
229             }
230         };
231
232         assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
233     }
234
235     @Test
236     public void testGetSequenceNumber() {
237         assertEquals(1000, feature.getSequenceNumber());
238     }
239
240     @Test
241     public void testStartableApi() {
242         assertTrue(feature.isAlive());
243         assertTrue(feature.start());
244         assertTrue(feature.stop());
245         feature.shutdown();
246
247         // above should have had no effect
248         assertTrue(feature.isAlive());
249
250         feature.afterStop(engine);
251         assertFalse(feature.isAlive());
252     }
253
254     @Test
255     public void testLockApi() {
256         assertFalse(feature.isLocked());
257         assertTrue(feature.lock());
258         assertTrue(feature.unlock());
259     }
260
261     @Test
262     public void testBeforeCreateLockManager() {
263         assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
264     }
265
266     /**
267      * Tests beforeCreate(), when getProperties() throws a runtime exception.
268      */
269     @Test
270     public void testBeforeCreateLockManagerEx() {
271         shutdownFeature();
272
273         feature = new MyLockingFeature(false) {
274             @Override
275             protected Properties getProperties(String fileName) {
276                 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
277             }
278         };
279
280         assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties()))
281                         .isInstanceOf(DistributedLockManagerException.class);
282     }
283
284     @Test
285     public void testAfterStart() {
286         // verify that cleanup & expire check are both added to the queue
287         verify(exsvc).execute(any());
288         verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
289     }
290
291     /**
292      * Tests afterStart(), when thread pool throws a runtime exception.
293      */
294     @Test
295     public void testAfterStartExInThreadPool() {
296         shutdownFeature();
297
298         feature = new MyLockingFeature(false);
299
300         when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
301                         .thenThrow(new IllegalArgumentException(EXPECTED_EXCEPTION));
302
303         assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
304     }
305
306     @Test
307     public void testDeleteExpiredDbLocks() throws SQLException {
308         // add records: two expired, one not
309         insertRecord(RESOURCE, feature.getUuidString(), -1);
310         insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
311         insertRecord(RESOURCE3, OTHER_OWNER, 0);
312         insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
313
314         // get the clean-up function and execute it
315         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
316         verify(exsvc).execute(captor.capture());
317
318         long tbegin = System.currentTimeMillis();
319         Runnable action = captor.getValue();
320         action.run();
321
322         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
323         assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
324         assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
325         assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
326
327         assertEquals(2, getRecordCount());
328     }
329
330     /**
331      * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
332      *
333      * @throws SQLException if an error occurs
334      */
335     @Test
336     public void testDeleteExpiredDbLocksEx() throws SQLException {
337         feature = new InvalidDbLockingFeature(TRANSIENT);
338
339         // get the clean-up function and execute it
340         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
341         verify(exsvc).execute(captor.capture());
342
343         Runnable action = captor.getValue();
344
345         // should not throw an exception
346         action.run();
347     }
348
349     @Test
350     public void testAfterStop() {
351         shutdownFeature();
352
353         feature = new DistributedLockManager();
354
355         // shutdown without calling afterStart()
356
357         shutdownFeature();
358     }
359
360     /**
361      * Tests afterStop(), when the data source throws an exception when close() is called.
362      *
363      * @throws SQLException if an error occurs
364      */
365     @Test
366     public void testAfterStopEx() throws SQLException {
367         shutdownFeature();
368
369         // use a data source that throws an exception when closed
370         feature = new InvalidDbLockingFeature(TRANSIENT);
371
372         shutdownFeature();
373     }
374
375     @Test
376     public void testCreateLock() throws SQLException {
377         verify(exsvc).execute(any());
378
379         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
380         assertTrue(lock.isWaiting());
381
382         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
383
384         // this lock should fail
385         LockCallback callback2 = mock(LockCallback.class);
386         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
387         assertTrue(lock2.isUnavailable());
388         verify(callback2, never()).lockAvailable(lock2);
389         verify(callback2).lockUnavailable(lock2);
390
391         // this should fail, too
392         LockCallback callback3 = mock(LockCallback.class);
393         DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
394         assertTrue(lock3.isUnavailable());
395         verify(callback3, never()).lockAvailable(lock3);
396         verify(callback3).lockUnavailable(lock3);
397
398         // no change to first
399         assertTrue(lock.isWaiting());
400
401         // no callbacks to the first lock
402         verify(callback, never()).lockAvailable(lock);
403         verify(callback, never()).lockUnavailable(lock);
404
405         assertTrue(lock.isWaiting());
406         assertEquals(0, getRecordCount());
407
408         runLock(0, 0);
409         assertTrue(lock.isActive());
410         assertEquals(1, getRecordCount());
411
412         verify(callback).lockAvailable(lock);
413         verify(callback, never()).lockUnavailable(lock);
414
415         // this should succeed
416         DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
417         assertTrue(lock4.isWaiting());
418
419         // after running checker, original records should still remain
420         runChecker(0, 0, EXPIRE_SEC);
421         assertEquals(1, getRecordCount());
422         verify(callback, never()).lockUnavailable(lock);
423     }
424
425     /**
426      * Tests lock() when the feature is not the latest instance.
427      */
428     @Test
429     public void testCreateLockNotLatestInstance() {
430         DistributedLockManager.setLatestInstance(null);
431
432         Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
433         assertTrue(lock.isUnavailable());
434         verify(callback, never()).lockAvailable(any());
435         verify(callback).lockUnavailable(lock);
436     }
437
438     @Test
439     public void testCheckExpired() throws SQLException {
440         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
441         runLock(0, 0);
442
443         LockCallback callback2 = mock(LockCallback.class);
444         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
445         runLock(1, 0);
446
447         LockCallback callback3 = mock(LockCallback.class);
448         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
449         runLock(2, 0);
450
451         LockCallback callback4 = mock(LockCallback.class);
452         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
453         runLock(3, 0);
454
455         LockCallback callback5 = mock(LockCallback.class);
456         final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
457         runLock(4, 0);
458
459         assertEquals(5, getRecordCount());
460
461         // expire one record
462         updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
463
464         // change host of another record
465         updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
466
467         // change uuid of another record
468         updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC);
469
470
471         // run the checker
472         runChecker(0, 0, EXPIRE_SEC);
473
474
475         // check lock states
476         assertTrue(lock.isUnavailable());
477         assertTrue(lock2.isActive());
478         assertTrue(lock3.isUnavailable());
479         assertTrue(lock4.isActive());
480         assertTrue(lock5.isUnavailable());
481
482         // allow callbacks
483         runLock(5, 2);
484         runLock(6, 1);
485         runLock(7, 0);
486         verify(callback).lockUnavailable(lock);
487         verify(callback3).lockUnavailable(lock3);
488         verify(callback5).lockUnavailable(lock5);
489
490         verify(callback2, never()).lockUnavailable(lock2);
491         verify(callback4, never()).lockUnavailable(lock4);
492
493
494         // another check should have been scheduled, with the normal interval
495         runChecker(1, 0, EXPIRE_SEC);
496     }
497
498     /**
499      * Tests checkExpired(), when schedule() throws an exception.
500      */
501     @Test
502     public void testCheckExpiredExecRejected() {
503         // arrange for execution to be rejected
504         when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
505                         .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
506
507         runChecker(0, 0, EXPIRE_SEC);
508     }
509
510     /**
511      * Tests checkExpired(), when getConnection() throws an exception.
512      */
513     @Test
514     public void testCheckExpiredSqlEx() {
515         // use a data source that throws an exception when getConnection() is called
516         feature = new InvalidDbLockingFeature(TRANSIENT);
517
518         runChecker(0, 0, EXPIRE_SEC);
519
520         // it should have scheduled another check, sooner
521         runChecker(0, 0, RETRY_SEC);
522     }
523
524     @Test
525     public void testExpireLocks() throws SQLException {
526         AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
527
528         feature = new MyLockingFeature(true) {
529             @Override
530             protected BasicDataSource makeDataSource() throws Exception {
531                 // get the real data source
532                 BasicDataSource src2 = super.makeDataSource();
533
534                 when(datasrc.getConnection()).thenAnswer(answer -> {
535                     DistributedLock lck = freeLock.getAndSet(null);
536                     if (lck != null) {
537                         // free it
538                         lck.free();
539
540                         // run its doUnlock
541                         runLock(4, 0);
542                     }
543
544                     return src2.getConnection();
545                 });
546
547                 return datasrc;
548             }
549         };
550
551         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
552         runLock(0, 0);
553
554         LockCallback callback2 = mock(LockCallback.class);
555         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
556         runLock(1, 0);
557
558         LockCallback callback3 = mock(LockCallback.class);
559         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
560         // don't run doLock for lock3 - leave it in the waiting state
561
562         LockCallback callback4 = mock(LockCallback.class);
563         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
564         runLock(3, 0);
565
566         assertEquals(3, getRecordCount());
567
568         // expire one record
569         updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
570
571         // arrange to free lock4 while the checker is running
572         freeLock.set(lock4);
573
574         // run the checker
575         runChecker(0, 0, EXPIRE_SEC);
576
577
578         // check lock states
579         assertTrue(lock.isUnavailable());
580         assertTrue(lock2.isActive());
581         assertTrue(lock3.isWaiting());
582         assertTrue(lock4.isUnavailable());
583
584         runLock(5, 0);
585         verify(exsvc, times(PRE_LOCK_EXECS + 6)).execute(any());
586
587         verify(callback).lockUnavailable(lock);
588         verify(callback2, never()).lockUnavailable(lock2);
589         verify(callback3, never()).lockUnavailable(lock3);
590         verify(callback4, never()).lockUnavailable(lock4);
591     }
592
593     @Test
594     public void testDistributedLockNoArgs() {
595         DistributedLock lock = new DistributedLock();
596         assertNull(lock.getResourceId());
597         assertNull(lock.getOwnerKey());
598         assertNull(lock.getCallback());
599         assertEquals(0, lock.getHoldSec());
600     }
601
602     @Test
603     public void testDistributedLock() {
604         assertThatIllegalArgumentException()
605                         .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
606                         .withMessageContaining("holdSec");
607
608         // should generate no exception
609         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
610     }
611
612     @Test
613     public void testDistributedLockSerializable() throws Exception {
614         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
615         lock = roundTrip(lock);
616
617         assertTrue(lock.isWaiting());
618
619         assertEquals(RESOURCE, lock.getResourceId());
620         assertEquals(OWNER_KEY, lock.getOwnerKey());
621         assertNull(lock.getCallback());
622         assertEquals(HOLD_SEC, lock.getHoldSec());
623     }
624
625     @Test
626     public void testGrant() {
627         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
628         assertFalse(lock.isActive());
629
630         // execute the doLock() call
631         runLock(0, 0);
632
633         assertTrue(lock.isActive());
634
635         // the callback for the lock should have been run in the foreground thread
636         verify(callback).lockAvailable(lock);
637     }
638
639     /**
640      * Tests grant() when the lock is already unavailable.
641      */
642     @Test
643     public void testDistributedLockGrantUnavailable() {
644         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
645         lock.setState(LockState.UNAVAILABLE);
646         lock.grant();
647
648         assertTrue(lock.isUnavailable());
649         verify(callback, never()).lockAvailable(any());
650         verify(callback, never()).lockUnavailable(any());
651     }
652
653     @Test
654     public void testDistributedLockDeny() {
655         // get a lock
656         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
657
658         // get another lock - should fail
659         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
660
661         assertTrue(lock.isUnavailable());
662
663         // the callback for the second lock should have been run in the foreground thread
664         verify(callback).lockUnavailable(lock);
665
666         // should only have a request for the first lock
667         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
668     }
669
670     @Test
671     public void testDistributedLockFree() {
672         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
673
674         assertTrue(lock.free());
675         assertTrue(lock.isUnavailable());
676
677         // run both requests associated with the lock
678         runLock(0, 1);
679         runLock(1, 0);
680
681         // should not have changed state
682         assertTrue(lock.isUnavailable());
683
684         // attempt to free it again
685         assertFalse(lock.free());
686
687         // should not have queued anything else
688         verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
689
690         // new lock should succeed
691         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
692         assertTrue(lock2 != lock);
693         assertTrue(lock2.isWaiting());
694     }
695
696     /**
697      * Tests that free() works on a serialized lock with a new feature.
698      *
699      * @throws Exception if an error occurs
700      */
701     @Test
702     public void testDistributedLockFreeSerialized() throws Exception {
703         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
704
705         feature = new MyLockingFeature(true);
706
707         lock = roundTrip(lock);
708         assertTrue(lock.free());
709         assertTrue(lock.isUnavailable());
710     }
711
712     /**
713      * Tests free() on a serialized lock without a feature.
714      *
715      * @throws Exception if an error occurs
716      */
717     @Test
718     public void testDistributedLockFreeNoFeature() throws Exception {
719         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
720
721         DistributedLockManager.setLatestInstance(null);
722
723         lock = roundTrip(lock);
724         assertFalse(lock.free());
725         assertTrue(lock.isUnavailable());
726     }
727
728     /**
729      * Tests the case where the lock is freed and doUnlock called between the call to
730      * isUnavailable() and the call to compute().
731      */
732     @Test
733     public void testDistributedLockFreeUnlocked() {
734         feature = new FreeWithFreeLockingFeature(true);
735
736         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
737
738         assertFalse(lock.free());
739         assertTrue(lock.isUnavailable());
740     }
741
742     /**
743      * Tests the case where the lock is freed, but doUnlock is not completed, between the
744      * call to isUnavailable() and the call to compute().
745      */
746     @Test
747     public void testDistributedLockFreeLockFreed() {
748         feature = new FreeWithFreeLockingFeature(false);
749
750         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
751
752         assertFalse(lock.free());
753         assertTrue(lock.isUnavailable());
754     }
755
756     @Test
757     public void testDistributedLockExtend() {
758         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
759
760         // lock2 should be denied - called back by this thread
761         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
762         verify(callback, never()).lockAvailable(lock2);
763         verify(callback).lockUnavailable(lock2);
764
765         // lock2 will still be denied - called back by this thread
766         lock2.extend(HOLD_SEC, callback);
767         verify(callback, times(2)).lockUnavailable(lock2);
768
769         // force lock2 to be active - should still be denied
770         Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
771         lock2.extend(HOLD_SEC, callback);
772         verify(callback, times(3)).lockUnavailable(lock2);
773
774         assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
775                         .withMessageContaining("holdSec");
776
777         // execute doLock()
778         runLock(0, 0);
779         assertTrue(lock.isActive());
780
781         // now extend the first lock
782         LockCallback callback2 = mock(LockCallback.class);
783         lock.extend(HOLD_SEC2, callback2);
784         assertTrue(lock.isWaiting());
785
786         // execute doExtend()
787         runLock(1, 0);
788         lock.extend(HOLD_SEC2, callback2);
789         assertEquals(HOLD_SEC2, lock.getHoldSec());
790         verify(callback2).lockAvailable(lock);
791         verify(callback2, never()).lockUnavailable(lock);
792     }
793
794     /**
795      * Tests that extend() works on a serialized lock with a new feature.
796      *
797      * @throws Exception if an error occurs
798      */
799     @Test
800     public void testDistributedLockExtendSerialized() throws Exception {
801         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
802
803         // run doLock
804         runLock(0, 0);
805         assertTrue(lock.isActive());
806
807         feature = new MyLockingFeature(true);
808
809         lock = roundTrip(lock);
810         assertTrue(lock.isActive());
811
812         LockCallback scallback = mock(LockCallback.class);
813
814         lock.extend(HOLD_SEC, scallback);
815         assertTrue(lock.isWaiting());
816
817         // run doExtend (in new feature)
818         runLock(0, 0);
819         assertTrue(lock.isActive());
820
821         verify(scallback).lockAvailable(lock);
822         verify(scallback, never()).lockUnavailable(lock);
823     }
824
825     /**
826      * Tests extend() on a serialized lock without a feature.
827      *
828      * @throws Exception if an error occurs
829      */
830     @Test
831     public void testDistributedLockExtendNoFeature() throws Exception {
832         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
833
834         // run doLock
835         runLock(0, 0);
836         assertTrue(lock.isActive());
837
838         DistributedLockManager.setLatestInstance(null);
839
840         lock = roundTrip(lock);
841         assertTrue(lock.isActive());
842
843         LockCallback scallback = mock(LockCallback.class);
844
845         lock.extend(HOLD_SEC, scallback);
846         assertTrue(lock.isUnavailable());
847
848         verify(scallback, never()).lockAvailable(lock);
849         verify(scallback).lockUnavailable(lock);
850     }
851
852     /**
853      * Tests the case where the lock is freed and doUnlock called between the call to
854      * isUnavailable() and the call to compute().
855      */
856     @Test
857     public void testDistributedLockExtendUnlocked() {
858         feature = new FreeWithFreeLockingFeature(true);
859
860         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
861
862         lock.extend(HOLD_SEC2, callback);
863         assertTrue(lock.isUnavailable());
864         verify(callback).lockUnavailable(lock);
865     }
866
867     /**
868      * Tests the case where the lock is freed, but doUnlock is not completed, between the
869      * call to isUnavailable() and the call to compute().
870      */
871     @Test
872     public void testDistributedLockExtendLockFreed() {
873         feature = new FreeWithFreeLockingFeature(false);
874
875         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
876
877         lock.extend(HOLD_SEC2, callback);
878         assertTrue(lock.isUnavailable());
879         verify(callback).lockUnavailable(lock);
880     }
881
882     @Test
883     public void testDistributedLockScheduleRequest() {
884         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
885         runLock(0, 0);
886
887         verify(callback).lockAvailable(lock);
888     }
889
890     @Test
891     public void testDistributedLockRescheduleRequest() throws SQLException {
892         // use a data source that throws an exception when getConnection() is called
893         InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
894         feature = invfeat;
895
896         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
897
898         // invoke doLock - should fail and reschedule
899         runLock(0, 0);
900
901         // should still be waiting
902         assertTrue(lock.isWaiting());
903         verify(callback, never()).lockUnavailable(lock);
904
905         // free the lock while doLock is executing
906         invfeat.freeLock = true;
907
908         // try scheduled request - should just invoke doUnlock
909         runSchedule(0, 0);
910
911         // should still be waiting
912         assertTrue(lock.isUnavailable());
913         verify(callback, never()).lockUnavailable(lock);
914
915         // should have scheduled a retry of doUnlock
916         verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
917     }
918
919     @Test
920     public void testDistributedLockGetNextRequest() {
921         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
922
923         /*
924          * run doLock. This should cause getNextRequest() to be called twice, once with a
925          * request in the queue, and the second time with request=null.
926          */
927         runLock(0, 0);
928     }
929
930     /**
931      * Tests getNextRequest(), where the same request is still in the queue the second
932      * time it's called.
933      */
934     @Test
935     public void testDistributedLockGetNextRequestSameRequest() {
936         // force reschedule to be invoked
937         feature = new InvalidDbLockingFeature(TRANSIENT);
938
939         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
940
941         /*
942          * run doLock. This should cause getNextRequest() to be called twice, once with a
943          * request in the queue, and the second time with the same request again.
944          */
945         runLock(0, 0);
946
947         verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
948     }
949
950     @Test
951     public void testDistributedLockDoRequest() throws SQLException {
952         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
953
954         assertTrue(lock.isWaiting());
955
956         // run doLock via doRequest
957         runLock(0, 0);
958
959         assertTrue(lock.isActive());
960     }
961
962     /**
963      * Tests doRequest(), when doRequest() is already running within another thread.
964      */
965     @Test
966     public void testDistributedLockDoRequestBusy() {
967         /*
968          * this feature will invoke a request in a background thread while it's being run
969          * in a foreground thread.
970          */
971         AtomicBoolean running = new AtomicBoolean(false);
972         AtomicBoolean returned = new AtomicBoolean(false);
973
974         feature = new MyLockingFeature(true) {
975             @Override
976             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
977                             LockCallback callback) {
978                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
979                     private static final long serialVersionUID = 1L;
980
981                     @Override
982                     protected boolean doDbInsert(Connection conn) throws SQLException {
983                         if (running.get()) {
984                             // already inside the thread - don't recurse any further
985                             return super.doDbInsert(conn);
986                         }
987
988                         running.set(true);
989
990                         Thread thread = new Thread(() -> {
991                             // run doLock from within the new thread
992                             runLock(0, 0);
993                         });
994                         thread.setDaemon(true);
995                         thread.start();
996
997                         // wait for the background thread to complete before continuing
998                         try {
999                             thread.join(5000);
1000                         } catch (InterruptedException ignore) {
1001                             Thread.currentThread().interrupt();
1002                         }
1003
1004                         returned.set(!thread.isAlive());
1005
1006                         return super.doDbInsert(conn);
1007                     }
1008                 };
1009             }
1010         };
1011
1012         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1013
1014         // run doLock
1015         runLock(0, 0);
1016
1017         assertTrue(returned.get());
1018     }
1019
1020     /**
1021      * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1022      *
1023      * @throws SQLException if an error occurs
1024      */
1025     @Test
1026     public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1027         // throw run-time exception
1028         when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1029
1030         // use a data source that throws an exception when getConnection() is called
1031         feature = new MyLockingFeature(true) {
1032             @Override
1033             protected BasicDataSource makeDataSource() throws Exception {
1034                 return datasrc;
1035             }
1036         };
1037
1038         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1039
1040         // invoke doLock - should NOT reschedule
1041         runLock(0, 0);
1042
1043         assertTrue(lock.isUnavailable());
1044         verify(callback).lockUnavailable(lock);
1045
1046         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1047     }
1048
1049     /**
1050      * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1051      * state.
1052      *
1053      * @throws SQLException if an error occurs
1054      */
1055     @Test
1056     public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1057         // throw run-time exception
1058         when(datasrc.getConnection()).thenAnswer(answer -> {
1059             lock.free();
1060             throw new IllegalStateException(EXPECTED_EXCEPTION);
1061         });
1062
1063         // use a data source that throws an exception when getConnection() is called
1064         feature = new MyLockingFeature(true) {
1065             @Override
1066             protected BasicDataSource makeDataSource() throws Exception {
1067                 return datasrc;
1068             }
1069         };
1070
1071         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1072
1073         // invoke doLock - should NOT reschedule
1074         runLock(0, 0);
1075
1076         assertTrue(lock.isUnavailable());
1077         verify(callback, never()).lockUnavailable(lock);
1078
1079         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1080     }
1081
1082     /**
1083      * Tests doRequest() when the retry count gets exhausted.
1084      */
1085     @Test
1086     public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1087         // use a data source that throws an exception when getConnection() is called
1088         feature = new InvalidDbLockingFeature(TRANSIENT);
1089
1090         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1091
1092         // invoke doLock - should fail and reschedule
1093         runLock(0, 0);
1094
1095         // should still be waiting
1096         assertTrue(lock.isWaiting());
1097         verify(callback, never()).lockUnavailable(lock);
1098
1099         // try again, via SCHEDULER - first retry fails
1100         runSchedule(0, 0);
1101
1102         // should still be waiting
1103         assertTrue(lock.isWaiting());
1104         verify(callback, never()).lockUnavailable(lock);
1105
1106         // try again, via SCHEDULER - final retry fails
1107         runSchedule(1, 0);
1108         assertTrue(lock.isUnavailable());
1109
1110         // now callback should have been called
1111         verify(callback).lockUnavailable(lock);
1112     }
1113
1114     /**
1115      * Tests doRequest() when a non-transient DB exception is thrown.
1116      */
1117     @Test
1118     public void testDistributedLockDoRequestNotTransient() {
1119         /*
1120          * use a data source that throws a PERMANENT exception when getConnection() is
1121          * called
1122          */
1123         feature = new InvalidDbLockingFeature(PERMANENT);
1124
1125         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1126
1127         // invoke doLock - should fail
1128         runLock(0, 0);
1129
1130         assertTrue(lock.isUnavailable());
1131         verify(callback).lockUnavailable(lock);
1132
1133         // should not have scheduled anything new
1134         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1135         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1136     }
1137
1138     @Test
1139     public void testDistributedLockDoLock() throws SQLException {
1140         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1141
1142         // invoke doLock - should simply do an insert
1143         long tbegin = System.currentTimeMillis();
1144         runLock(0, 0);
1145
1146         assertEquals(1, getRecordCount());
1147         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1148         verify(callback).lockAvailable(lock);
1149     }
1150
1151     /**
1152      * Tests doLock() when the lock is freed before doLock runs.
1153      *
1154      * @throws SQLException if an error occurs
1155      */
1156     @Test
1157     public void testDistributedLockDoLockFreed() throws SQLException {
1158         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1159
1160         lock.setState(LockState.UNAVAILABLE);
1161
1162         // invoke doLock - should do nothing
1163         runLock(0, 0);
1164
1165         assertEquals(0, getRecordCount());
1166
1167         verify(callback, never()).lockAvailable(lock);
1168     }
1169
1170     /**
1171      * Tests doLock() when a DB exception is thrown.
1172      */
1173     @Test
1174     public void testDistributedLockDoLockEx() {
1175         // use a data source that throws an exception when getConnection() is called
1176         feature = new InvalidDbLockingFeature(PERMANENT);
1177
1178         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1179
1180         // invoke doLock - should simply do an insert
1181         runLock(0, 0);
1182
1183         // lock should have failed due to exception
1184         verify(callback).lockUnavailable(lock);
1185     }
1186
1187     /**
1188      * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1189      * to be called.
1190      */
1191     @Test
1192     public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1193         // insert an expired record
1194         insertRecord(RESOURCE, feature.getUuidString(), 0);
1195
1196         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1197
1198         // invoke doLock - should simply do an update
1199         runLock(0, 0);
1200         verify(callback).lockAvailable(lock);
1201     }
1202
1203     /**
1204      * Tests doLock() when a locked record already exists.
1205      */
1206     @Test
1207     public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1208         // insert an expired record
1209         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1210
1211         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1212
1213         // invoke doLock
1214         runLock(0, 0);
1215
1216         // lock should have failed because it's already locked
1217         verify(callback).lockUnavailable(lock);
1218     }
1219
1220     @Test
1221     public void testDistributedLockDoUnlock() throws SQLException {
1222         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1223
1224         // invoke doLock()
1225         runLock(0, 0);
1226
1227         lock.free();
1228
1229         // invoke doUnlock()
1230         long tbegin = System.currentTimeMillis();
1231         runLock(1, 0);
1232
1233         assertEquals(0, getRecordCount());
1234         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1235
1236         assertTrue(lock.isUnavailable());
1237
1238         // no more callbacks should have occurred
1239         verify(callback, times(1)).lockAvailable(lock);
1240         verify(callback, never()).lockUnavailable(lock);
1241     }
1242
1243     /**
1244      * Tests doUnlock() when a DB exception is thrown.
1245      *
1246      * @throws SQLException if an error occurs
1247      */
1248     @Test
1249     public void testDistributedLockDoUnlockEx() throws SQLException {
1250         feature = new InvalidDbLockingFeature(PERMANENT);
1251
1252         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1253
1254         // do NOT invoke doLock() - it will fail without a DB connection
1255
1256         lock.free();
1257
1258         // invoke doUnlock()
1259         runLock(1, 0);
1260
1261         assertTrue(lock.isUnavailable());
1262
1263         // no more callbacks should have occurred
1264         verify(callback, never()).lockAvailable(lock);
1265         verify(callback, never()).lockUnavailable(lock);
1266     }
1267
1268     @Test
1269     public void testDistributedLockDoExtend() throws SQLException {
1270         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1271         runLock(0, 0);
1272
1273         LockCallback callback2 = mock(LockCallback.class);
1274         lock.extend(HOLD_SEC2, callback2);
1275
1276         // call doExtend()
1277         long tbegin = System.currentTimeMillis();
1278         runLock(1, 0);
1279
1280         assertEquals(1, getRecordCount());
1281         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1282
1283         assertTrue(lock.isActive());
1284
1285         // no more callbacks should have occurred
1286         verify(callback).lockAvailable(lock);
1287         verify(callback, never()).lockUnavailable(lock);
1288
1289         // extension should have succeeded
1290         verify(callback2).lockAvailable(lock);
1291         verify(callback2, never()).lockUnavailable(lock);
1292     }
1293
1294     /**
1295      * Tests doExtend() when the lock is freed before doExtend runs.
1296      *
1297      * @throws SQLException if an error occurs
1298      */
1299     @Test
1300     public void testDistributedLockDoExtendFreed() throws SQLException {
1301         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1302         lock.extend(HOLD_SEC2, callback);
1303
1304         lock.setState(LockState.UNAVAILABLE);
1305
1306         // invoke doExtend - should do nothing
1307         runLock(1, 0);
1308
1309         assertEquals(0, getRecordCount());
1310
1311         verify(callback, never()).lockAvailable(lock);
1312     }
1313
1314     /**
1315      * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1316      * insert.
1317      *
1318      * @throws SQLException if an error occurs
1319      */
1320     @Test
1321     public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1322         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1323         runLock(0, 0);
1324
1325         LockCallback callback2 = mock(LockCallback.class);
1326         lock.extend(HOLD_SEC2, callback2);
1327
1328         // delete the record so it's forced to re-insert it
1329         cleanDb();
1330
1331         // call doExtend()
1332         long tbegin = System.currentTimeMillis();
1333         runLock(1, 0);
1334
1335         assertEquals(1, getRecordCount());
1336         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1337
1338         assertTrue(lock.isActive());
1339
1340         // no more callbacks should have occurred
1341         verify(callback).lockAvailable(lock);
1342         verify(callback, never()).lockUnavailable(lock);
1343
1344         // extension should have succeeded
1345         verify(callback2).lockAvailable(lock);
1346         verify(callback2, never()).lockUnavailable(lock);
1347     }
1348
1349     /**
1350      * Tests doExtend() when both update and insert fail.
1351      *
1352      * @throws SQLException if an error occurs
1353      */
1354     @Test
1355     public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
1356         /*
1357          * this feature will create a lock that returns false when doDbUpdate() is
1358          * invoked, or when doDbInsert() is invoked a second time
1359          */
1360         feature = new MyLockingFeature(true) {
1361             @Override
1362             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1363                             LockCallback callback) {
1364                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1365                     private static final long serialVersionUID = 1L;
1366                     private int ntimes = 0;
1367
1368                     @Override
1369                     protected boolean doDbInsert(Connection conn) throws SQLException {
1370                         if (ntimes++ > 0) {
1371                             return false;
1372                         }
1373
1374                         return super.doDbInsert(conn);
1375                     }
1376
1377                     @Override
1378                     protected boolean doDbUpdate(Connection conn) {
1379                         return false;
1380                     }
1381                 };
1382             }
1383         };
1384
1385         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1386         runLock(0, 0);
1387
1388         LockCallback callback2 = mock(LockCallback.class);
1389         lock.extend(HOLD_SEC2, callback2);
1390
1391         // call doExtend()
1392         runLock(1, 0);
1393
1394         assertTrue(lock.isUnavailable());
1395
1396         // no more callbacks should have occurred
1397         verify(callback).lockAvailable(lock);
1398         verify(callback, never()).lockUnavailable(lock);
1399
1400         // extension should have failed
1401         verify(callback2, never()).lockAvailable(lock);
1402         verify(callback2).lockUnavailable(lock);
1403     }
1404
1405     /**
1406      * Tests doExtend() when an exception occurs.
1407      *
1408      * @throws SQLException if an error occurs
1409      */
1410     @Test
1411     public void testDistributedLockDoExtendEx() throws SQLException {
1412         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1413         runLock(0, 0);
1414
1415         /*
1416          * delete the record and insert one with a different owner, which will cause
1417          * doDbInsert() to throw an exception
1418          */
1419         cleanDb();
1420         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1421
1422         LockCallback callback2 = mock(LockCallback.class);
1423         lock.extend(HOLD_SEC2, callback2);
1424
1425         // call doExtend()
1426         runLock(1, 0);
1427
1428         assertTrue(lock.isUnavailable());
1429
1430         // no more callbacks should have occurred
1431         verify(callback).lockAvailable(lock);
1432         verify(callback, never()).lockUnavailable(lock);
1433
1434         // extension should have failed
1435         verify(callback2, never()).lockAvailable(lock);
1436         verify(callback2).lockUnavailable(lock);
1437     }
1438
1439     @Test
1440     public void testDistributedLockToString() {
1441         String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
1442         assertNotNull(text);
1443         assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1444     }
1445
1446     @Test
1447     public void testMakeThreadPool() {
1448         // use a REAL feature to test this
1449         feature = new DistributedLockManager();
1450
1451         // this should create a thread pool
1452         feature.beforeCreateLockManager(engine, new Properties());
1453         feature.afterStart(engine);
1454
1455         shutdownFeature();
1456     }
1457
1458     /**
1459      * Performs a multi-threaded test of the locking facility.
1460      *
1461      * @throws InterruptedException if the current thread is interrupted while waiting for
1462      *         the background threads to complete
1463      */
1464     @Test
1465     public void testMultiThreaded() throws InterruptedException {
1466         feature = new DistributedLockManager();
1467         feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1468         feature.afterStart(PolicyEngineConstants.getManager());
1469
1470         List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1471         for (int x = 0; x < MAX_THREADS; ++x) {
1472             threads.add(new MyThread());
1473         }
1474
1475         threads.forEach(Thread::start);
1476
1477         for (MyThread thread : threads) {
1478             thread.join(6000);
1479             assertFalse(thread.isAlive());
1480         }
1481
1482         for (MyThread thread : threads) {
1483             if (thread.err != null) {
1484                 throw thread.err;
1485             }
1486         }
1487
1488         assertTrue(nsuccesses.get() > 0);
1489     }
1490
1491     private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
1492                     boolean waitForLock) {
1493         return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
1494     }
1495
1496     private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1497         ByteArrayOutputStream baos = new ByteArrayOutputStream();
1498         try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1499             oos.writeObject(lock);
1500         }
1501
1502         ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1503         try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1504             return (DistributedLock) ois.readObject();
1505         }
1506     }
1507
1508     /**
1509      * Runs the checkExpired() action.
1510      *
1511      * @param nskip number of actions in the work queue to skip
1512      * @param nadditional number of additional actions that appear in the work queue
1513      *        <i>after</i> the checkExpired action
1514      * @param schedSec number of seconds for which the checker should have been scheduled
1515      */
1516     private void runChecker(int nskip, int nadditional, long schedSec) {
1517         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1518         verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1519         Runnable action = captor.getAllValues().get(nskip);
1520         action.run();
1521     }
1522
1523     /**
1524      * Runs a lock action (e.g., doLock, doUnlock).
1525      *
1526      * @param nskip number of actions in the work queue to skip
1527      * @param nadditional number of additional actions that appear in the work queue
1528      *        <i>after</i> the desired action
1529      */
1530     void runLock(int nskip, int nadditional) {
1531         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1532         verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1533
1534         Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1535         action.run();
1536     }
1537
1538     /**
1539      * Runs a scheduled action (e.g., "retry" action).
1540      *
1541      * @param nskip number of actions in the work queue to skip
1542      * @param nadditional number of additional actions that appear in the work queue
1543      *        <i>after</i> the desired action
1544      */
1545     void runSchedule(int nskip, int nadditional) {
1546         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1547         verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
1548
1549         Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1550         action.run();
1551     }
1552
1553     /**
1554      * Gets a count of the number of lock records in the DB.
1555      *
1556      * @return the number of lock records in the DB
1557      * @throws SQLException if an error occurs accessing the DB
1558      */
1559     private int getRecordCount() throws SQLException {
1560         try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1561                         ResultSet result = stmt.executeQuery()) {
1562
1563             if (result.next()) {
1564                 return result.getInt(1);
1565
1566             } else {
1567                 return 0;
1568             }
1569         }
1570     }
1571
1572     /**
1573      * Determines if there is a record for the given resource whose expiration time is in
1574      * the expected range.
1575      *
1576      * @param resourceId ID of the resource of interest
1577      * @param uuidString UUID string of the owner
1578      * @param holdSec seconds for which the lock was to be held
1579      * @param tbegin earliest time, in milliseconds, at which the record could have been
1580      *        inserted into the DB
1581      * @return {@code true} if a record is found, {@code false} otherwise
1582      * @throws SQLException if an error occurs accessing the DB
1583      */
1584     private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1585         try (PreparedStatement stmt =
1586                         conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1587                                         + " WHERE resourceId=? AND host=? AND owner=?")) {
1588
1589             stmt.setString(1, resourceId);
1590             stmt.setString(2, feature.getHostName());
1591             stmt.setString(3, uuidString);
1592
1593             try (ResultSet result = stmt.executeQuery()) {
1594                 if (result.next()) {
1595                     int remaining = result.getInt(1);
1596                     long maxDiff = System.currentTimeMillis() - tbegin;
1597                     return (remaining >= 0 && holdSec - remaining <= maxDiff);
1598
1599                 } else {
1600                     return false;
1601                 }
1602             }
1603         }
1604     }
1605
1606     /**
1607      * Inserts a record into the DB.
1608      *
1609      * @param resourceId ID of the resource of interest
1610      * @param uuidString UUID string of the owner
1611      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1612      * @throws SQLException if an error occurs accessing the DB
1613      */
1614     private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1615         this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset);
1616     }
1617
1618     private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1619                     throws SQLException {
1620         try (PreparedStatement stmt =
1621                         conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1622                                         + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1623
1624             stmt.setString(1, resourceId);
1625             stmt.setString(2, hostName);
1626             stmt.setString(3, uuidString);
1627             stmt.setInt(4, expireOffset);
1628
1629             assertEquals(1, stmt.executeUpdate());
1630         }
1631     }
1632
1633     /**
1634      * Updates a record in the DB.
1635      *
1636      * @param resourceId ID of the resource of interest
1637      * @param newUuid UUID string of the <i>new</i> owner
1638      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1639      * @throws SQLException if an error occurs accessing the DB
1640      */
1641     private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1642         try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1643                         + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1644
1645             stmt.setString(1, newHost);
1646             stmt.setString(2, newUuid);
1647             stmt.setInt(3, expireOffset);
1648             stmt.setString(4, resourceId);
1649
1650             assertEquals(1, stmt.executeUpdate());
1651         }
1652     }
1653
1654     /**
1655      * Feature that uses <i>exsvc</i> to execute requests.
1656      */
1657     private class MyLockingFeature extends DistributedLockManager {
1658
1659         public MyLockingFeature(boolean init) {
1660             shutdownFeature();
1661
1662             exsvc = mock(ScheduledExecutorService.class);
1663             when(engine.getExecutorService()).thenReturn(exsvc);
1664
1665             if (init) {
1666                 beforeCreateLockManager(engine, new Properties());
1667                 afterStart(engine);
1668             }
1669         }
1670     }
1671
1672     /**
1673      * Feature whose data source all throws exceptions.
1674      */
1675     private class InvalidDbLockingFeature extends MyLockingFeature {
1676         private int errcode;
1677         private boolean freeLock = false;
1678
1679         public InvalidDbLockingFeature(int errcode) {
1680             // pass "false" because we have to set the error code BEFORE calling
1681             // afterStart()
1682             super(false);
1683
1684             this.errcode = errcode;
1685
1686             this.beforeCreateLockManager(engine, new Properties());
1687             this.afterStart(engine);
1688         }
1689
1690         @Override
1691         protected BasicDataSource makeDataSource() throws Exception {
1692             when(datasrc.getConnection()).thenAnswer(answer -> {
1693                 if (freeLock) {
1694                     freeLock = false;
1695                     lock.free();
1696                 }
1697
1698                 throw new SQLException(EXPECTED_EXCEPTION, "", errcode);
1699             });
1700
1701             doThrow(new SQLException(EXPECTED_EXCEPTION, "", errcode)).when(datasrc).close();
1702
1703             return datasrc;
1704         }
1705     }
1706
1707     /**
1708      * Feature whose locks free themselves while free() is already running.
1709      */
1710     private class FreeWithFreeLockingFeature extends MyLockingFeature {
1711         private boolean relock;
1712
1713         public FreeWithFreeLockingFeature(boolean relock) {
1714             super(true);
1715             this.relock = relock;
1716         }
1717
1718         @Override
1719         protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1720                         LockCallback callback) {
1721
1722             return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1723                 private static final long serialVersionUID = 1L;
1724                 private boolean checked = false;
1725
1726                 @Override
1727                 public boolean isUnavailable() {
1728                     if (checked) {
1729                         return super.isUnavailable();
1730                     }
1731
1732                     checked = true;
1733
1734                     // release and relock
1735                     free();
1736
1737                     if (relock) {
1738                         // run doUnlock
1739                         runLock(1, 0);
1740
1741                         // relock it
1742                         createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1743                     }
1744
1745                     return false;
1746                 }
1747             };
1748         }
1749     }
1750
1751     /**
1752      * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
1753      * extend it, and then unlock it.
1754      */
1755     private class MyThread extends Thread {
1756         AssertionError err = null;
1757
1758         public MyThread() {
1759             setDaemon(true);
1760         }
1761
1762         @Override
1763         public void run() {
1764             try {
1765                 for (int x = 0; x < MAX_LOOPS; ++x) {
1766                     makeAttempt();
1767                 }
1768
1769             } catch (AssertionError e) {
1770                 err = e;
1771             }
1772         }
1773
1774         private void makeAttempt() {
1775             try {
1776                 Semaphore sem = new Semaphore(0);
1777
1778                 LockCallback cb = new LockCallback() {
1779                     @Override
1780                     public void lockAvailable(Lock lock) {
1781                         sem.release();
1782                     }
1783
1784                     @Override
1785                     public void lockUnavailable(Lock lock) {
1786                         sem.release();
1787                     }
1788                 };
1789
1790                 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1791
1792                 // wait for callback, whether available or unavailable
1793                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1794                 if (!lock.isActive()) {
1795                     return;
1796                 }
1797
1798                 nsuccesses.incrementAndGet();
1799
1800                 assertEquals(1, nactive.incrementAndGet());
1801
1802                 lock.extend(HOLD_SEC2, cb);
1803                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1804                 assertTrue(lock.isActive());
1805
1806                 // decrement BEFORE free()
1807                 nactive.decrementAndGet();
1808
1809                 assertTrue(lock.free());
1810                 assertTrue(lock.isUnavailable());
1811
1812             } catch (InterruptedException e) {
1813                 Thread.currentThread().interrupt();
1814                 throw new AssertionError("interrupted", e);
1815             }
1816         }
1817     }
1818 }