c996d8d97e8046cede0b5906848ad0da63289568
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.distributed.locking;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
32 import static org.mockito.Matchers.any;
33 import static org.mockito.Matchers.anyBoolean;
34 import static org.mockito.Matchers.anyLong;
35 import static org.mockito.Matchers.eq;
36 import static org.mockito.Mockito.doThrow;
37 import static org.mockito.Mockito.mock;
38 import static org.mockito.Mockito.never;
39 import static org.mockito.Mockito.times;
40 import static org.mockito.Mockito.verify;
41 import static org.mockito.Mockito.when;
42
43 import java.io.ByteArrayInputStream;
44 import java.io.ByteArrayOutputStream;
45 import java.io.ObjectInputStream;
46 import java.io.ObjectOutputStream;
47 import java.sql.Connection;
48 import java.sql.DriverManager;
49 import java.sql.PreparedStatement;
50 import java.sql.ResultSet;
51 import java.sql.SQLException;
52 import java.sql.SQLTransientException;
53 import java.util.ArrayList;
54 import java.util.List;
55 import java.util.Properties;
56 import java.util.concurrent.Executors;
57 import java.util.concurrent.RejectedExecutionException;
58 import java.util.concurrent.ScheduledExecutorService;
59 import java.util.concurrent.ScheduledFuture;
60 import java.util.concurrent.Semaphore;
61 import java.util.concurrent.TimeUnit;
62 import java.util.concurrent.atomic.AtomicBoolean;
63 import java.util.concurrent.atomic.AtomicInteger;
64 import java.util.concurrent.atomic.AtomicReference;
65 import org.apache.commons.dbcp2.BasicDataSource;
66 import org.junit.After;
67 import org.junit.AfterClass;
68 import org.junit.Before;
69 import org.junit.BeforeClass;
70 import org.junit.Test;
71 import org.mockito.ArgumentCaptor;
72 import org.mockito.Mock;
73 import org.mockito.MockitoAnnotations;
74 import org.onap.policy.common.utils.services.OrderedServiceImpl;
75 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
76 import org.onap.policy.drools.core.lock.Lock;
77 import org.onap.policy.drools.core.lock.LockCallback;
78 import org.onap.policy.drools.core.lock.LockState;
79 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
80 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
81 import org.onap.policy.drools.system.PolicyEngine;
82 import org.onap.policy.drools.system.PolicyEngineConstants;
83 import org.powermock.reflect.Whitebox;
84
85 public class DistributedLockManagerTest {
86     private static final long EXPIRE_SEC = 900L;
87     private static final long RETRY_SEC = 60L;
88     private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
89     private static final String OTHER_HOST = "other-host";
90     private static final String OTHER_OWNER = "other-owner";
91     private static final String EXPECTED_EXCEPTION = "expected exception";
92     private static final String DB_CONNECTION =
93                     "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
94     private static final String DB_USER = "user";
95     private static final String DB_PASSWORD = "password";
96     private static final String OWNER_KEY = "my key";
97     private static final String RESOURCE = "my resource";
98     private static final String RESOURCE2 = "my resource #2";
99     private static final String RESOURCE3 = "my resource #3";
100     private static final String RESOURCE4 = "my resource #4";
101     private static final String RESOURCE5 = "my resource #5";
102     private static final int HOLD_SEC = 100;
103     private static final int HOLD_SEC2 = 120;
104     private static final int MAX_THREADS = 5;
105     private static final int MAX_LOOPS = 100;
106     private static final boolean TRANSIENT = true;
107     private static final boolean PERMANENT = false;
108
109     // number of execute() calls before the first lock attempt
110     private static final int PRE_LOCK_EXECS = 1;
111
112     // number of execute() calls before the first schedule attempt
113     private static final int PRE_SCHED_EXECS = 1;
114
115     private static Connection conn = null;
116     private static ScheduledExecutorService saveExec;
117     private static ScheduledExecutorService realExec;
118
119     @Mock
120     private PolicyEngine engine;
121
122     @Mock
123     private ScheduledExecutorService exsvc;
124
125     @Mock
126     private ScheduledFuture<?> checker;
127
128     @Mock
129     private LockCallback callback;
130
131     @Mock
132     private BasicDataSource datasrc;
133
134     private DistributedLock lock;
135
136     private AtomicInteger nactive;
137     private AtomicInteger nsuccesses;
138     private DistributedLockManager feature;
139
140
141     /**
142      * Configures the location of the property files and creates the DB.
143      *
144      * @throws SQLException if the DB cannot be created
145      */
146     @BeforeClass
147     public static void setUpBeforeClass() throws SQLException {
148         SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
149
150         conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
151
152         try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
153                         + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
154                         + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
155             createStmt.executeUpdate();
156         }
157
158         saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
159
160         realExec = Executors.newScheduledThreadPool(3);
161     }
162
163     /**
164      * Restores static fields.
165      */
166     @AfterClass
167     public static void tearDownAfterClass() throws SQLException {
168         Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
169         realExec.shutdown();
170         conn.close();
171     }
172
173     /**
174      * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
175      * tasks.
176      *
177      * @throws SQLException if the lock records cannot be deleted from the DB
178      */
179     @Before
180     public void setUp() throws SQLException {
181         MockitoAnnotations.initMocks(this);
182
183         nactive = new AtomicInteger(0);
184         nsuccesses = new AtomicInteger(0);
185
186         cleanDb();
187
188         feature = new MyLockingFeature(true);
189     }
190
191     @After
192     public void tearDown() throws SQLException {
193         shutdownFeature();
194         cleanDb();
195     }
196
197     private void cleanDb() throws SQLException {
198         try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
199             stmt.executeUpdate();
200         }
201     }
202
203     private void shutdownFeature() {
204         if (feature != null) {
205             feature.afterStop(engine);
206             feature = null;
207         }
208     }
209
210     /**
211      * Tests that the feature is found in the expected service sets.
212      */
213     @Test
214     public void testServiceApis() {
215         assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
216                         .anyMatch(obj -> obj instanceof DistributedLockManager));
217     }
218
219     @Test
220     public void testGetSequenceNumber() {
221         assertEquals(1000, feature.getSequenceNumber());
222     }
223
224     @Test
225     public void testBeforeCreateLockManager() {
226         assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
227     }
228
229     /**
230      * Tests beforeCreate(), when getProperties() throws a runtime exception.
231      */
232     @Test
233     public void testBeforeCreateLockManagerEx() {
234         shutdownFeature();
235
236         feature = new MyLockingFeature(false) {
237             @Override
238             protected Properties getProperties(String fileName) {
239                 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
240             }
241         };
242
243         assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties()))
244                         .isInstanceOf(DistributedLockManagerException.class);
245     }
246
247     @Test
248     public void testAfterStart() {
249         // verify that cleanup & expire check are both added to the queue
250         verify(exsvc).execute(any());
251         verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
252     }
253
254     /**
255      * Tests afterStart(), when thread pool throws a runtime exception.
256      */
257     @Test
258     public void testAfterStartExInThreadPool() {
259         shutdownFeature();
260
261         feature = new MyLockingFeature(false);
262
263         doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
264
265         assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
266     }
267
268     @Test
269     public void testDeleteExpiredDbLocks() throws SQLException {
270         // add records: two expired, one not
271         insertRecord(RESOURCE, feature.getUuidString(), -1);
272         insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
273         insertRecord(RESOURCE3, OTHER_OWNER, 0);
274         insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
275
276         // get the clean-up function and execute it
277         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
278         verify(exsvc).execute(captor.capture());
279
280         long tbegin = System.currentTimeMillis();
281         Runnable action = captor.getValue();
282         action.run();
283
284         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
285         assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
286         assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
287         assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
288
289         assertEquals(2, getRecordCount());
290     }
291
292     /**
293      * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
294      *
295      * @throws SQLException if an error occurs
296      */
297     @Test
298     public void testDeleteExpiredDbLocksEx() throws SQLException {
299         feature = new InvalidDbLockingFeature(TRANSIENT);
300
301         // get the clean-up function and execute it
302         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
303         verify(exsvc).execute(captor.capture());
304
305         Runnable action = captor.getValue();
306
307         // should not throw an exception
308         action.run();
309     }
310
311     @Test
312     public void testAfterStop() {
313         shutdownFeature();
314         verify(checker).cancel(anyBoolean());
315
316         feature = new DistributedLockManager();
317
318         // shutdown without calling afterStart()
319
320         shutdownFeature();
321     }
322
323     /**
324      * Tests afterStop(), when the data source throws an exception when close() is called.
325      *
326      * @throws SQLException if an error occurs
327      */
328     @Test
329     public void testAfterStopEx() throws SQLException {
330         shutdownFeature();
331
332         // use a data source that throws an exception when closed
333         feature = new InvalidDbLockingFeature(TRANSIENT);
334
335         shutdownFeature();
336     }
337
338     @Test
339     public void testCreateLock() throws SQLException {
340         verify(exsvc).execute(any());
341
342         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
343         assertTrue(lock.isWaiting());
344
345         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
346
347         // this lock should fail
348         LockCallback callback2 = mock(LockCallback.class);
349         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
350         assertTrue(lock2.isUnavailable());
351         verify(callback2, never()).lockAvailable(lock2);
352         verify(callback2).lockUnavailable(lock2);
353
354         // this should fail, too
355         LockCallback callback3 = mock(LockCallback.class);
356         DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
357         assertTrue(lock3.isUnavailable());
358         verify(callback3, never()).lockAvailable(lock3);
359         verify(callback3).lockUnavailable(lock3);
360
361         // no change to first
362         assertTrue(lock.isWaiting());
363
364         // no callbacks to the first lock
365         verify(callback, never()).lockAvailable(lock);
366         verify(callback, never()).lockUnavailable(lock);
367
368         assertTrue(lock.isWaiting());
369         assertEquals(0, getRecordCount());
370
371         runLock(0, 0);
372         assertTrue(lock.isActive());
373         assertEquals(1, getRecordCount());
374
375         verify(callback).lockAvailable(lock);
376         verify(callback, never()).lockUnavailable(lock);
377
378         // this should succeed
379         DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
380         assertTrue(lock4.isWaiting());
381
382         // after running checker, original records should still remain
383         runChecker(0, 0, EXPIRE_SEC);
384         assertEquals(1, getRecordCount());
385         verify(callback, never()).lockUnavailable(lock);
386     }
387
388     /**
389      * Tests createLock() when the feature is not the latest instance.
390      */
391     @Test
392     public void testCreateLockNotLatestInstance() {
393         DistributedLockManager.setLatestInstance(null);
394
395         Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
396         assertTrue(lock.isUnavailable());
397         verify(callback, never()).lockAvailable(any());
398         verify(callback).lockUnavailable(lock);
399     }
400
401     @Test
402     public void testCheckExpired() throws SQLException {
403         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
404         runLock(0, 0);
405
406         LockCallback callback2 = mock(LockCallback.class);
407         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
408         runLock(1, 0);
409
410         LockCallback callback3 = mock(LockCallback.class);
411         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
412         runLock(2, 0);
413
414         LockCallback callback4 = mock(LockCallback.class);
415         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
416         runLock(3, 0);
417
418         LockCallback callback5 = mock(LockCallback.class);
419         final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
420         runLock(4, 0);
421
422         assertEquals(5, getRecordCount());
423
424         // expire one record
425         updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
426
427         // change host of another record
428         updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
429
430         // change uuid of another record
431         updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC);
432
433
434         // run the checker
435         runChecker(0, 0, EXPIRE_SEC);
436
437
438         // check lock states
439         assertTrue(lock.isUnavailable());
440         assertTrue(lock2.isActive());
441         assertTrue(lock3.isUnavailable());
442         assertTrue(lock4.isActive());
443         assertTrue(lock5.isUnavailable());
444
445         // allow callbacks
446         runLock(5, 2);
447         runLock(6, 1);
448         runLock(7, 0);
449         verify(callback).lockUnavailable(lock);
450         verify(callback3).lockUnavailable(lock3);
451         verify(callback5).lockUnavailable(lock5);
452
453         verify(callback2, never()).lockUnavailable(lock2);
454         verify(callback4, never()).lockUnavailable(lock4);
455
456
457         // another check should have been scheduled, with the normal interval
458         runChecker(1, 0, EXPIRE_SEC);
459     }
460
461     /**
462      * Tests checkExpired(), when schedule() throws an exception.
463      */
464     @Test
465     public void testCheckExpiredExecRejected() {
466         // arrange for execution to be rejected
467         when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
468                         .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
469
470         runChecker(0, 0, EXPIRE_SEC);
471     }
472
473     /**
474      * Tests checkExpired(), when getConnection() throws an exception.
475      */
476     @Test
477     public void testCheckExpiredSqlEx() {
478         // use a data source that throws an exception when getConnection() is called
479         feature = new InvalidDbLockingFeature(TRANSIENT);
480
481         runChecker(0, 0, EXPIRE_SEC);
482
483         // it should have scheduled another check, sooner
484         runChecker(0, 0, RETRY_SEC);
485     }
486
487     /**
488      * Tests checkExpired(), when getConnection() throws an exception and the feature is
489      * no longer alive.
490      */
491     @Test
492     public void testCheckExpiredSqlExFeatureStopped() {
493         // use a data source that throws an exception when getConnection() is called
494         feature = new InvalidDbLockingFeature(TRANSIENT) {
495             @Override
496             protected SQLException makeEx() {
497                 this.stop();
498                 return super.makeEx();
499             }
500         };
501
502         runChecker(0, 0, EXPIRE_SEC);
503
504         // it should NOT have scheduled another check
505         verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
506     }
507
508     @Test
509     public void testExpireLocks() throws SQLException {
510         AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
511
512         feature = new MyLockingFeature(true) {
513             @Override
514             protected BasicDataSource makeDataSource() throws Exception {
515                 // get the real data source
516                 BasicDataSource src2 = super.makeDataSource();
517
518                 when(datasrc.getConnection()).thenAnswer(answer -> {
519                     DistributedLock lck = freeLock.getAndSet(null);
520                     if (lck != null) {
521                         // free it
522                         lck.free();
523
524                         // run its doUnlock
525                         runLock(4, 0);
526                     }
527
528                     return src2.getConnection();
529                 });
530
531                 return datasrc;
532             }
533         };
534
535         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
536         runLock(0, 0);
537
538         LockCallback callback2 = mock(LockCallback.class);
539         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
540         runLock(1, 0);
541
542         LockCallback callback3 = mock(LockCallback.class);
543         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
544         // don't run doLock for lock3 - leave it in the waiting state
545
546         LockCallback callback4 = mock(LockCallback.class);
547         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
548         runLock(3, 0);
549
550         assertEquals(3, getRecordCount());
551
552         // expire one record
553         updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
554
555         // arrange to free lock4 while the checker is running
556         freeLock.set(lock4);
557
558         // run the checker
559         runChecker(0, 0, EXPIRE_SEC);
560
561
562         // check lock states
563         assertTrue(lock.isUnavailable());
564         assertTrue(lock2.isActive());
565         assertTrue(lock3.isWaiting());
566         assertTrue(lock4.isUnavailable());
567
568         runLock(5, 0);
569         verify(exsvc, times(PRE_LOCK_EXECS + 6)).execute(any());
570
571         verify(callback).lockUnavailable(lock);
572         verify(callback2, never()).lockUnavailable(lock2);
573         verify(callback3, never()).lockUnavailable(lock3);
574         verify(callback4, never()).lockUnavailable(lock4);
575     }
576
577     @Test
578     public void testDistributedLockNoArgs() {
579         DistributedLock lock = new DistributedLock();
580         assertNull(lock.getResourceId());
581         assertNull(lock.getOwnerKey());
582         assertNull(lock.getCallback());
583         assertEquals(0, lock.getHoldSec());
584     }
585
586     @Test
587     public void testDistributedLock() {
588         assertThatIllegalArgumentException()
589                         .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
590                         .withMessageContaining("holdSec");
591
592         // should generate no exception
593         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
594     }
595
596     @Test
597     public void testDistributedLockSerializable() throws Exception {
598         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
599         lock = roundTrip(lock);
600
601         assertTrue(lock.isWaiting());
602
603         assertEquals(RESOURCE, lock.getResourceId());
604         assertEquals(OWNER_KEY, lock.getOwnerKey());
605         assertNull(lock.getCallback());
606         assertEquals(HOLD_SEC, lock.getHoldSec());
607     }
608
609     @Test
610     public void testGrant() {
611         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
612         assertFalse(lock.isActive());
613
614         // execute the doLock() call
615         runLock(0, 0);
616
617         assertTrue(lock.isActive());
618
619         // the callback for the lock should have been run in the foreground thread
620         verify(callback).lockAvailable(lock);
621     }
622
623     @Test
624     public void testDistributedLockDeny() {
625         // get a lock
626         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
627
628         // get another lock - should fail
629         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
630
631         assertTrue(lock.isUnavailable());
632
633         // the callback for the second lock should have been run in the foreground thread
634         verify(callback).lockUnavailable(lock);
635
636         // should only have a request for the first lock
637         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
638     }
639
640     @Test
641     public void testDistributedLockFree() {
642         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
643
644         assertTrue(lock.free());
645         assertTrue(lock.isUnavailable());
646
647         // run both requests associated with the lock
648         runLock(0, 1);
649         runLock(1, 0);
650
651         // should not have changed state
652         assertTrue(lock.isUnavailable());
653
654         // attempt to free it again
655         assertFalse(lock.free());
656
657         // should not have queued anything else
658         verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
659
660         // new lock should succeed
661         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
662         assertTrue(lock2 != lock);
663         assertTrue(lock2.isWaiting());
664     }
665
666     /**
667      * Tests that free() works on a serialized lock with a new feature.
668      *
669      * @throws Exception if an error occurs
670      */
671     @Test
672     public void testDistributedLockFreeSerialized() throws Exception {
673         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
674
675         feature = new MyLockingFeature(true);
676
677         lock = roundTrip(lock);
678         assertTrue(lock.free());
679         assertTrue(lock.isUnavailable());
680     }
681
682     /**
683      * Tests free() on a serialized lock without a feature.
684      *
685      * @throws Exception if an error occurs
686      */
687     @Test
688     public void testDistributedLockFreeNoFeature() throws Exception {
689         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
690
691         DistributedLockManager.setLatestInstance(null);
692
693         lock = roundTrip(lock);
694         assertFalse(lock.free());
695         assertTrue(lock.isUnavailable());
696     }
697
698     /**
699      * Tests the case where the lock is freed and doUnlock called between the call to
700      * isUnavailable() and the call to compute().
701      */
702     @Test
703     public void testDistributedLockFreeUnlocked() {
704         feature = new FreeWithFreeLockingFeature(true);
705
706         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
707
708         assertFalse(lock.free());
709         assertTrue(lock.isUnavailable());
710     }
711
712     /**
713      * Tests the case where the lock is freed, but doUnlock is not completed, between the
714      * call to isUnavailable() and the call to compute().
715      */
716     @Test
717     public void testDistributedLockFreeLockFreed() {
718         feature = new FreeWithFreeLockingFeature(false);
719
720         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
721
722         assertFalse(lock.free());
723         assertTrue(lock.isUnavailable());
724     }
725
726     @Test
727     public void testDistributedLockExtend() {
728         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
729
730         // lock2 should be denied - called back by this thread
731         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
732         verify(callback, never()).lockAvailable(lock2);
733         verify(callback).lockUnavailable(lock2);
734
735         // lock2 will still be denied - called back by this thread
736         lock2.extend(HOLD_SEC, callback);
737         verify(callback, times(2)).lockUnavailable(lock2);
738
739         // force lock2 to be active - should still be denied
740         Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
741         lock2.extend(HOLD_SEC, callback);
742         verify(callback, times(3)).lockUnavailable(lock2);
743
744         assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
745                         .withMessageContaining("holdSec");
746
747         // execute doLock()
748         runLock(0, 0);
749         assertTrue(lock.isActive());
750
751         // now extend the first lock
752         LockCallback callback2 = mock(LockCallback.class);
753         lock.extend(HOLD_SEC2, callback2);
754         assertTrue(lock.isWaiting());
755
756         // execute doExtend()
757         runLock(1, 0);
758         lock.extend(HOLD_SEC2, callback2);
759         assertEquals(HOLD_SEC2, lock.getHoldSec());
760         verify(callback2).lockAvailable(lock);
761         verify(callback2, never()).lockUnavailable(lock);
762     }
763
764     /**
765      * Tests that extend() works on a serialized lock with a new feature.
766      *
767      * @throws Exception if an error occurs
768      */
769     @Test
770     public void testDistributedLockExtendSerialized() throws Exception {
771         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
772
773         // run doLock
774         runLock(0, 0);
775         assertTrue(lock.isActive());
776
777         feature = new MyLockingFeature(true);
778
779         lock = roundTrip(lock);
780         assertTrue(lock.isActive());
781
782         LockCallback scallback = mock(LockCallback.class);
783
784         lock.extend(HOLD_SEC, scallback);
785         assertTrue(lock.isWaiting());
786
787         // run doExtend (in new feature)
788         runLock(0, 0);
789         assertTrue(lock.isActive());
790
791         verify(scallback).lockAvailable(lock);
792         verify(scallback, never()).lockUnavailable(lock);
793     }
794
795     /**
796      * Tests extend() on a serialized lock without a feature.
797      *
798      * @throws Exception if an error occurs
799      */
800     @Test
801     public void testDistributedLockExtendNoFeature() throws Exception {
802         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
803
804         // run doLock
805         runLock(0, 0);
806         assertTrue(lock.isActive());
807
808         DistributedLockManager.setLatestInstance(null);
809
810         lock = roundTrip(lock);
811         assertTrue(lock.isActive());
812
813         LockCallback scallback = mock(LockCallback.class);
814
815         lock.extend(HOLD_SEC, scallback);
816         assertTrue(lock.isUnavailable());
817
818         verify(scallback, never()).lockAvailable(lock);
819         verify(scallback).lockUnavailable(lock);
820     }
821
822     /**
823      * Tests the case where the lock is freed and doUnlock called between the call to
824      * isUnavailable() and the call to compute().
825      */
826     @Test
827     public void testDistributedLockExtendUnlocked() {
828         feature = new FreeWithFreeLockingFeature(true);
829
830         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
831
832         lock.extend(HOLD_SEC2, callback);
833         assertTrue(lock.isUnavailable());
834         verify(callback).lockUnavailable(lock);
835     }
836
837     /**
838      * Tests the case where the lock is freed, but doUnlock is not completed, between the
839      * call to isUnavailable() and the call to compute().
840      */
841     @Test
842     public void testDistributedLockExtendLockFreed() {
843         feature = new FreeWithFreeLockingFeature(false);
844
845         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
846
847         lock.extend(HOLD_SEC2, callback);
848         assertTrue(lock.isUnavailable());
849         verify(callback).lockUnavailable(lock);
850     }
851
852     @Test
853     public void testDistributedLockScheduleRequest() {
854         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
855         runLock(0, 0);
856
857         verify(callback).lockAvailable(lock);
858     }
859
860     @Test
861     public void testDistributedLockRescheduleRequest() throws SQLException {
862         // use a data source that throws an exception when getConnection() is called
863         InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
864         feature = invfeat;
865
866         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
867
868         // invoke doLock - should fail and reschedule
869         runLock(0, 0);
870
871         // should still be waiting
872         assertTrue(lock.isWaiting());
873         verify(callback, never()).lockUnavailable(lock);
874
875         // free the lock while doLock is executing
876         invfeat.freeLock = true;
877
878         // try scheduled request - should just invoke doUnlock
879         runSchedule(0, 0);
880
881         // should still be waiting
882         assertTrue(lock.isUnavailable());
883         verify(callback, never()).lockUnavailable(lock);
884
885         // should have scheduled a retry of doUnlock
886         verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
887     }
888
889     @Test
890     public void testDistributedLockGetNextRequest() {
891         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
892
893         /*
894          * run doLock. This should cause getNextRequest() to be called twice, once with a
895          * request in the queue, and the second time with request=null.
896          */
897         runLock(0, 0);
898     }
899
900     /**
901      * Tests getNextRequest(), where the same request is still in the queue the second
902      * time it's called.
903      */
904     @Test
905     public void testDistributedLockGetNextRequestSameRequest() {
906         // force reschedule to be invoked
907         feature = new InvalidDbLockingFeature(TRANSIENT);
908
909         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
910
911         /*
912          * run doLock. This should cause getNextRequest() to be called twice, once with a
913          * request in the queue, and the second time with the same request again.
914          */
915         runLock(0, 0);
916
917         verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
918     }
919
920     @Test
921     public void testDistributedLockDoRequest() throws SQLException {
922         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
923
924         assertTrue(lock.isWaiting());
925
926         // run doLock via doRequest
927         runLock(0, 0);
928
929         assertTrue(lock.isActive());
930     }
931
932     /**
933      * Tests doRequest(), when doRequest() is already running within another thread.
934      */
935     @Test
936     public void testDistributedLockDoRequestBusy() {
937         /*
938          * this feature will invoke a request in a background thread while it's being run
939          * in a foreground thread.
940          */
941         AtomicBoolean running = new AtomicBoolean(false);
942         AtomicBoolean returned = new AtomicBoolean(false);
943
944         feature = new MyLockingFeature(true) {
945             @Override
946             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
947                             LockCallback callback) {
948                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
949                     private static final long serialVersionUID = 1L;
950
951                     @Override
952                     protected boolean doDbInsert(Connection conn) throws SQLException {
953                         if (running.get()) {
954                             // already inside the thread - don't recurse any further
955                             return super.doDbInsert(conn);
956                         }
957
958                         running.set(true);
959
960                         Thread thread = new Thread(() -> {
961                             // run doLock from within the new thread
962                             runLock(0, 0);
963                         });
964                         thread.setDaemon(true);
965                         thread.start();
966
967                         // wait for the background thread to complete before continuing
968                         try {
969                             thread.join(5000);
970                         } catch (InterruptedException ignore) {
971                             Thread.currentThread().interrupt();
972                         }
973
974                         returned.set(!thread.isAlive());
975
976                         return super.doDbInsert(conn);
977                     }
978                 };
979             }
980         };
981
982         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
983
984         // run doLock
985         runLock(0, 0);
986
987         assertTrue(returned.get());
988     }
989
990     /**
991      * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
992      *
993      * @throws SQLException if an error occurs
994      */
995     @Test
996     public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
997         // throw run-time exception
998         when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
999
1000         // use a data source that throws an exception when getConnection() is called
1001         feature = new MyLockingFeature(true) {
1002             @Override
1003             protected BasicDataSource makeDataSource() throws Exception {
1004                 return datasrc;
1005             }
1006         };
1007
1008         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1009
1010         // invoke doLock - should NOT reschedule
1011         runLock(0, 0);
1012
1013         assertTrue(lock.isUnavailable());
1014         verify(callback).lockUnavailable(lock);
1015
1016         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1017     }
1018
1019     /**
1020      * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1021      * state.
1022      *
1023      * @throws SQLException if an error occurs
1024      */
1025     @Test
1026     public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1027         // throw run-time exception
1028         when(datasrc.getConnection()).thenAnswer(answer -> {
1029             lock.free();
1030             throw new IllegalStateException(EXPECTED_EXCEPTION);
1031         });
1032
1033         // use a data source that throws an exception when getConnection() is called
1034         feature = new MyLockingFeature(true) {
1035             @Override
1036             protected BasicDataSource makeDataSource() throws Exception {
1037                 return datasrc;
1038             }
1039         };
1040
1041         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1042
1043         // invoke doLock - should NOT reschedule
1044         runLock(0, 0);
1045
1046         assertTrue(lock.isUnavailable());
1047         verify(callback, never()).lockUnavailable(lock);
1048
1049         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1050     }
1051
1052     /**
1053      * Tests doRequest() when the retry count gets exhausted.
1054      */
1055     @Test
1056     public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1057         // use a data source that throws an exception when getConnection() is called
1058         feature = new InvalidDbLockingFeature(TRANSIENT);
1059
1060         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1061
1062         // invoke doLock - should fail and reschedule
1063         runLock(0, 0);
1064
1065         // should still be waiting
1066         assertTrue(lock.isWaiting());
1067         verify(callback, never()).lockUnavailable(lock);
1068
1069         // try again, via SCHEDULER - first retry fails
1070         runSchedule(0, 0);
1071
1072         // should still be waiting
1073         assertTrue(lock.isWaiting());
1074         verify(callback, never()).lockUnavailable(lock);
1075
1076         // try again, via SCHEDULER - final retry fails
1077         runSchedule(1, 0);
1078         assertTrue(lock.isUnavailable());
1079
1080         // now callback should have been called
1081         verify(callback).lockUnavailable(lock);
1082     }
1083
1084     /**
1085      * Tests doRequest() when a non-transient DB exception is thrown.
1086      */
1087     @Test
1088     public void testDistributedLockDoRequestNotTransient() {
1089         /*
1090          * use a data source that throws a PERMANENT exception when getConnection() is
1091          * called
1092          */
1093         feature = new InvalidDbLockingFeature(PERMANENT);
1094
1095         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1096
1097         // invoke doLock - should fail
1098         runLock(0, 0);
1099
1100         assertTrue(lock.isUnavailable());
1101         verify(callback).lockUnavailable(lock);
1102
1103         // should not have scheduled anything new
1104         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1105         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1106     }
1107
1108     @Test
1109     public void testDistributedLockDoLock() throws SQLException {
1110         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1111
1112         // invoke doLock - should simply do an insert
1113         long tbegin = System.currentTimeMillis();
1114         runLock(0, 0);
1115
1116         assertEquals(1, getRecordCount());
1117         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1118         verify(callback).lockAvailable(lock);
1119     }
1120
1121     /**
1122      * Tests doLock() when the lock is freed before doLock runs.
1123      *
1124      * @throws SQLException if an error occurs
1125      */
1126     @Test
1127     public void testDistributedLockDoLockFreed() throws SQLException {
1128         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1129
1130         lock.setState(LockState.UNAVAILABLE);
1131
1132         // invoke doLock - should do nothing
1133         runLock(0, 0);
1134
1135         assertEquals(0, getRecordCount());
1136
1137         verify(callback, never()).lockAvailable(lock);
1138     }
1139
1140     /**
1141      * Tests doLock() when a DB exception is thrown.
1142      */
1143     @Test
1144     public void testDistributedLockDoLockEx() {
1145         // use a data source that throws an exception when getConnection() is called
1146         feature = new InvalidDbLockingFeature(PERMANENT);
1147
1148         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1149
1150         // invoke doLock - should simply do an insert
1151         runLock(0, 0);
1152
1153         // lock should have failed due to exception
1154         verify(callback).lockUnavailable(lock);
1155     }
1156
1157     /**
1158      * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1159      * to be called.
1160      */
1161     @Test
1162     public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1163         // insert an expired record
1164         insertRecord(RESOURCE, feature.getUuidString(), 0);
1165
1166         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1167
1168         // invoke doLock - should simply do an update
1169         runLock(0, 0);
1170         verify(callback).lockAvailable(lock);
1171     }
1172
1173     /**
1174      * Tests doLock() when a locked record already exists.
1175      */
1176     @Test
1177     public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1178         // insert an expired record
1179         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1180
1181         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1182
1183         // invoke doLock
1184         runLock(0, 0);
1185
1186         // lock should have failed because it's already locked
1187         verify(callback).lockUnavailable(lock);
1188     }
1189
1190     @Test
1191     public void testDistributedLockDoUnlock() throws SQLException {
1192         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1193
1194         // invoke doLock()
1195         runLock(0, 0);
1196
1197         lock.free();
1198
1199         // invoke doUnlock()
1200         long tbegin = System.currentTimeMillis();
1201         runLock(1, 0);
1202
1203         assertEquals(0, getRecordCount());
1204         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1205
1206         assertTrue(lock.isUnavailable());
1207
1208         // no more callbacks should have occurred
1209         verify(callback, times(1)).lockAvailable(lock);
1210         verify(callback, never()).lockUnavailable(lock);
1211     }
1212
1213     /**
1214      * Tests doUnlock() when a DB exception is thrown.
1215      *
1216      * @throws SQLException if an error occurs
1217      */
1218     @Test
1219     public void testDistributedLockDoUnlockEx() throws SQLException {
1220         feature = new InvalidDbLockingFeature(PERMANENT);
1221
1222         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1223
1224         // do NOT invoke doLock() - it will fail without a DB connection
1225
1226         lock.free();
1227
1228         // invoke doUnlock()
1229         runLock(1, 0);
1230
1231         assertTrue(lock.isUnavailable());
1232
1233         // no more callbacks should have occurred
1234         verify(callback, never()).lockAvailable(lock);
1235         verify(callback, never()).lockUnavailable(lock);
1236     }
1237
1238     @Test
1239     public void testDistributedLockDoExtend() throws SQLException {
1240         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1241         runLock(0, 0);
1242
1243         LockCallback callback2 = mock(LockCallback.class);
1244         lock.extend(HOLD_SEC2, callback2);
1245
1246         // call doExtend()
1247         long tbegin = System.currentTimeMillis();
1248         runLock(1, 0);
1249
1250         assertEquals(1, getRecordCount());
1251         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1252
1253         assertTrue(lock.isActive());
1254
1255         // no more callbacks should have occurred
1256         verify(callback).lockAvailable(lock);
1257         verify(callback, never()).lockUnavailable(lock);
1258
1259         // extension should have succeeded
1260         verify(callback2).lockAvailable(lock);
1261         verify(callback2, never()).lockUnavailable(lock);
1262     }
1263
1264     /**
1265      * Tests doExtend() when the lock is freed before doExtend runs.
1266      *
1267      * @throws SQLException if an error occurs
1268      */
1269     @Test
1270     public void testDistributedLockDoExtendFreed() throws SQLException {
1271         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1272         lock.extend(HOLD_SEC2, callback);
1273
1274         lock.setState(LockState.UNAVAILABLE);
1275
1276         // invoke doExtend - should do nothing
1277         runLock(1, 0);
1278
1279         assertEquals(0, getRecordCount());
1280
1281         verify(callback, never()).lockAvailable(lock);
1282     }
1283
1284     /**
1285      * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1286      * insert.
1287      *
1288      * @throws SQLException if an error occurs
1289      */
1290     @Test
1291     public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1292         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1293         runLock(0, 0);
1294
1295         LockCallback callback2 = mock(LockCallback.class);
1296         lock.extend(HOLD_SEC2, callback2);
1297
1298         // delete the record so it's forced to re-insert it
1299         cleanDb();
1300
1301         // call doExtend()
1302         long tbegin = System.currentTimeMillis();
1303         runLock(1, 0);
1304
1305         assertEquals(1, getRecordCount());
1306         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1307
1308         assertTrue(lock.isActive());
1309
1310         // no more callbacks should have occurred
1311         verify(callback).lockAvailable(lock);
1312         verify(callback, never()).lockUnavailable(lock);
1313
1314         // extension should have succeeded
1315         verify(callback2).lockAvailable(lock);
1316         verify(callback2, never()).lockUnavailable(lock);
1317     }
1318
1319     /**
1320      * Tests doExtend() when both update and insert fail.
1321      *
1322      * @throws SQLException if an error occurs
1323      */
1324     @Test
1325     public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
1326         /*
1327          * this feature will create a lock that returns false when doDbUpdate() is
1328          * invoked, or when doDbInsert() is invoked a second time
1329          */
1330         feature = new MyLockingFeature(true) {
1331             @Override
1332             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1333                             LockCallback callback) {
1334                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1335                     private static final long serialVersionUID = 1L;
1336                     private int ntimes = 0;
1337
1338                     @Override
1339                     protected boolean doDbInsert(Connection conn) throws SQLException {
1340                         if (ntimes++ > 0) {
1341                             return false;
1342                         }
1343
1344                         return super.doDbInsert(conn);
1345                     }
1346
1347                     @Override
1348                     protected boolean doDbUpdate(Connection conn) {
1349                         return false;
1350                     }
1351                 };
1352             }
1353         };
1354
1355         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1356         runLock(0, 0);
1357
1358         LockCallback callback2 = mock(LockCallback.class);
1359         lock.extend(HOLD_SEC2, callback2);
1360
1361         // call doExtend()
1362         runLock(1, 0);
1363
1364         assertTrue(lock.isUnavailable());
1365
1366         // no more callbacks should have occurred
1367         verify(callback).lockAvailable(lock);
1368         verify(callback, never()).lockUnavailable(lock);
1369
1370         // extension should have failed
1371         verify(callback2, never()).lockAvailable(lock);
1372         verify(callback2).lockUnavailable(lock);
1373     }
1374
1375     /**
1376      * Tests doExtend() when an exception occurs.
1377      *
1378      * @throws SQLException if an error occurs
1379      */
1380     @Test
1381     public void testDistributedLockDoExtendEx() throws SQLException {
1382         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1383         runLock(0, 0);
1384
1385         /*
1386          * delete the record and insert one with a different owner, which will cause
1387          * doDbInsert() to throw an exception
1388          */
1389         cleanDb();
1390         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1391
1392         LockCallback callback2 = mock(LockCallback.class);
1393         lock.extend(HOLD_SEC2, callback2);
1394
1395         // call doExtend()
1396         runLock(1, 0);
1397
1398         assertTrue(lock.isUnavailable());
1399
1400         // no more callbacks should have occurred
1401         verify(callback).lockAvailable(lock);
1402         verify(callback, never()).lockUnavailable(lock);
1403
1404         // extension should have failed
1405         verify(callback2, never()).lockAvailable(lock);
1406         verify(callback2).lockUnavailable(lock);
1407     }
1408
1409     @Test
1410     public void testDistributedLockToString() {
1411         String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
1412         assertNotNull(text);
1413         assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1414     }
1415
1416     @Test
1417     public void testMakeThreadPool() {
1418         // use a REAL feature to test this
1419         feature = new DistributedLockManager();
1420
1421         // this should create a thread pool
1422         feature.beforeCreateLockManager(engine, new Properties());
1423         feature.afterStart(engine);
1424
1425         shutdownFeature();
1426     }
1427
1428     /**
1429      * Performs a multi-threaded test of the locking facility.
1430      *
1431      * @throws InterruptedException if the current thread is interrupted while waiting for
1432      *         the background threads to complete
1433      */
1434     @Test
1435     public void testMultiThreaded() throws InterruptedException {
1436         Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1437
1438         feature = new DistributedLockManager();
1439         feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1440         feature.afterStart(PolicyEngineConstants.getManager());
1441
1442         List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1443         for (int x = 0; x < MAX_THREADS; ++x) {
1444             threads.add(new MyThread());
1445         }
1446
1447         threads.forEach(Thread::start);
1448
1449         for (MyThread thread : threads) {
1450             thread.join(6000);
1451             assertFalse(thread.isAlive());
1452         }
1453
1454         for (MyThread thread : threads) {
1455             if (thread.err != null) {
1456                 throw thread.err;
1457             }
1458         }
1459
1460         assertTrue(nsuccesses.get() > 0);
1461     }
1462
1463     private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
1464                     boolean waitForLock) {
1465         return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
1466     }
1467
1468     private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1469         ByteArrayOutputStream baos = new ByteArrayOutputStream();
1470         try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1471             oos.writeObject(lock);
1472         }
1473
1474         ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1475         try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1476             return (DistributedLock) ois.readObject();
1477         }
1478     }
1479
1480     /**
1481      * Runs the checkExpired() action.
1482      *
1483      * @param nskip number of actions in the work queue to skip
1484      * @param nadditional number of additional actions that appear in the work queue
1485      *        <i>after</i> the checkExpired action
1486      * @param schedSec number of seconds for which the checker should have been scheduled
1487      */
1488     private void runChecker(int nskip, int nadditional, long schedSec) {
1489         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1490         verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1491         Runnable action = captor.getAllValues().get(nskip);
1492         action.run();
1493     }
1494
1495     /**
1496      * Runs a lock action (e.g., doLock, doUnlock).
1497      *
1498      * @param nskip number of actions in the work queue to skip
1499      * @param nadditional number of additional actions that appear in the work queue
1500      *        <i>after</i> the desired action
1501      */
1502     void runLock(int nskip, int nadditional) {
1503         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1504         verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1505
1506         Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1507         action.run();
1508     }
1509
1510     /**
1511      * Runs a scheduled action (e.g., "retry" action).
1512      *
1513      * @param nskip number of actions in the work queue to skip
1514      * @param nadditional number of additional actions that appear in the work queue
1515      *        <i>after</i> the desired action
1516      */
1517     void runSchedule(int nskip, int nadditional) {
1518         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1519         verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
1520
1521         Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1522         action.run();
1523     }
1524
1525     /**
1526      * Gets a count of the number of lock records in the DB.
1527      *
1528      * @return the number of lock records in the DB
1529      * @throws SQLException if an error occurs accessing the DB
1530      */
1531     private int getRecordCount() throws SQLException {
1532         try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1533                         ResultSet result = stmt.executeQuery()) {
1534
1535             if (result.next()) {
1536                 return result.getInt(1);
1537
1538             } else {
1539                 return 0;
1540             }
1541         }
1542     }
1543
1544     /**
1545      * Determines if there is a record for the given resource whose expiration time is in
1546      * the expected range.
1547      *
1548      * @param resourceId ID of the resource of interest
1549      * @param uuidString UUID string of the owner
1550      * @param holdSec seconds for which the lock was to be held
1551      * @param tbegin earliest time, in milliseconds, at which the record could have been
1552      *        inserted into the DB
1553      * @return {@code true} if a record is found, {@code false} otherwise
1554      * @throws SQLException if an error occurs accessing the DB
1555      */
1556     private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1557         try (PreparedStatement stmt =
1558                         conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1559                                         + " WHERE resourceId=? AND host=? AND owner=?")) {
1560
1561             stmt.setString(1, resourceId);
1562             stmt.setString(2, feature.getHostName());
1563             stmt.setString(3, uuidString);
1564
1565             try (ResultSet result = stmt.executeQuery()) {
1566                 if (result.next()) {
1567                     int remaining = result.getInt(1);
1568                     long maxDiff = System.currentTimeMillis() - tbegin;
1569                     return (remaining >= 0 && holdSec - remaining <= maxDiff);
1570
1571                 } else {
1572                     return false;
1573                 }
1574             }
1575         }
1576     }
1577
1578     /**
1579      * Inserts a record into the DB.
1580      *
1581      * @param resourceId ID of the resource of interest
1582      * @param uuidString UUID string of the owner
1583      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1584      * @throws SQLException if an error occurs accessing the DB
1585      */
1586     private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1587         this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset);
1588     }
1589
1590     private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1591                     throws SQLException {
1592         try (PreparedStatement stmt =
1593                         conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1594                                         + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1595
1596             stmt.setString(1, resourceId);
1597             stmt.setString(2, hostName);
1598             stmt.setString(3, uuidString);
1599             stmt.setInt(4, expireOffset);
1600
1601             assertEquals(1, stmt.executeUpdate());
1602         }
1603     }
1604
1605     /**
1606      * Updates a record in the DB.
1607      *
1608      * @param resourceId ID of the resource of interest
1609      * @param newUuid UUID string of the <i>new</i> owner
1610      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1611      * @throws SQLException if an error occurs accessing the DB
1612      */
1613     private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1614         try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1615                         + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1616
1617             stmt.setString(1, newHost);
1618             stmt.setString(2, newUuid);
1619             stmt.setInt(3, expireOffset);
1620             stmt.setString(4, resourceId);
1621
1622             assertEquals(1, stmt.executeUpdate());
1623         }
1624     }
1625
1626     /**
1627      * Feature that uses <i>exsvc</i> to execute requests.
1628      */
1629     private class MyLockingFeature extends DistributedLockManager {
1630
1631         public MyLockingFeature(boolean init) {
1632             shutdownFeature();
1633
1634             exsvc = mock(ScheduledExecutorService.class);
1635             when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1636             Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1637
1638             if (init) {
1639                 beforeCreateLockManager(engine, new Properties());
1640                 start();
1641                 afterStart(engine);
1642             }
1643         }
1644     }
1645
1646     /**
1647      * Feature whose data source all throws exceptions.
1648      */
1649     private class InvalidDbLockingFeature extends MyLockingFeature {
1650         private boolean isTransient;
1651         private boolean freeLock = false;
1652
1653         public InvalidDbLockingFeature(boolean isTransient) {
1654             // pass "false" because we have to set the error code BEFORE calling
1655             // afterStart()
1656             super(false);
1657
1658             this.isTransient = isTransient;
1659
1660             this.beforeCreateLockManager(engine, new Properties());
1661             this.start();
1662             this.afterStart(engine);
1663         }
1664
1665         @Override
1666         protected BasicDataSource makeDataSource() throws Exception {
1667             when(datasrc.getConnection()).thenAnswer(answer -> {
1668                 if (freeLock) {
1669                     freeLock = false;
1670                     lock.free();
1671                 }
1672
1673                 throw makeEx();
1674             });
1675
1676             doThrow(makeEx()).when(datasrc).close();
1677
1678             return datasrc;
1679         }
1680
1681         protected SQLException makeEx() {
1682             if (isTransient) {
1683                 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1684
1685             } else {
1686                 return new SQLException(EXPECTED_EXCEPTION);
1687             }
1688         }
1689     }
1690
1691     /**
1692      * Feature whose locks free themselves while free() is already running.
1693      */
1694     private class FreeWithFreeLockingFeature extends MyLockingFeature {
1695         private boolean relock;
1696
1697         public FreeWithFreeLockingFeature(boolean relock) {
1698             super(true);
1699             this.relock = relock;
1700         }
1701
1702         @Override
1703         protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1704                         LockCallback callback) {
1705
1706             return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1707                 private static final long serialVersionUID = 1L;
1708                 private boolean checked = false;
1709
1710                 @Override
1711                 public boolean isUnavailable() {
1712                     if (checked) {
1713                         return super.isUnavailable();
1714                     }
1715
1716                     checked = true;
1717
1718                     // release and relock
1719                     free();
1720
1721                     if (relock) {
1722                         // run doUnlock
1723                         runLock(1, 0);
1724
1725                         // relock it
1726                         createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1727                     }
1728
1729                     return false;
1730                 }
1731             };
1732         }
1733     }
1734
1735     /**
1736      * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
1737      * extend it, and then unlock it.
1738      */
1739     private class MyThread extends Thread {
1740         AssertionError err = null;
1741
1742         public MyThread() {
1743             setDaemon(true);
1744         }
1745
1746         @Override
1747         public void run() {
1748             try {
1749                 for (int x = 0; x < MAX_LOOPS; ++x) {
1750                     makeAttempt();
1751                 }
1752
1753             } catch (AssertionError e) {
1754                 err = e;
1755             }
1756         }
1757
1758         private void makeAttempt() {
1759             try {
1760                 Semaphore sem = new Semaphore(0);
1761
1762                 LockCallback cb = new LockCallback() {
1763                     @Override
1764                     public void lockAvailable(Lock lock) {
1765                         sem.release();
1766                     }
1767
1768                     @Override
1769                     public void lockUnavailable(Lock lock) {
1770                         sem.release();
1771                     }
1772                 };
1773
1774                 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1775
1776                 // wait for callback, whether available or unavailable
1777                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1778                 if (!lock.isActive()) {
1779                     return;
1780                 }
1781
1782                 nsuccesses.incrementAndGet();
1783
1784                 assertEquals(1, nactive.incrementAndGet());
1785
1786                 lock.extend(HOLD_SEC2, cb);
1787                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1788                 assertTrue(lock.isActive());
1789
1790                 // decrement BEFORE free()
1791                 nactive.decrementAndGet();
1792
1793                 assertTrue(lock.free());
1794                 assertTrue(lock.isUnavailable());
1795
1796             } catch (InterruptedException e) {
1797                 Thread.currentThread().interrupt();
1798                 throw new AssertionError("interrupted", e);
1799             }
1800         }
1801     }
1802 }