0ba92c515fb5afb653e6e21adc13b5149896cd32
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.distributed.locking;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
32 import static org.mockito.Matchers.any;
33 import static org.mockito.Matchers.anyLong;
34 import static org.mockito.Matchers.eq;
35 import static org.mockito.Mockito.doThrow;
36 import static org.mockito.Mockito.mock;
37 import static org.mockito.Mockito.never;
38 import static org.mockito.Mockito.times;
39 import static org.mockito.Mockito.verify;
40 import static org.mockito.Mockito.when;
41
42 import java.io.ByteArrayInputStream;
43 import java.io.ByteArrayOutputStream;
44 import java.io.ObjectInputStream;
45 import java.io.ObjectOutputStream;
46 import java.sql.Connection;
47 import java.sql.DriverManager;
48 import java.sql.PreparedStatement;
49 import java.sql.ResultSet;
50 import java.sql.SQLException;
51 import java.sql.SQLTransientException;
52 import java.util.ArrayList;
53 import java.util.List;
54 import java.util.Properties;
55 import java.util.concurrent.Executors;
56 import java.util.concurrent.RejectedExecutionException;
57 import java.util.concurrent.ScheduledExecutorService;
58 import java.util.concurrent.Semaphore;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.atomic.AtomicBoolean;
61 import java.util.concurrent.atomic.AtomicInteger;
62 import java.util.concurrent.atomic.AtomicReference;
63 import org.apache.commons.dbcp2.BasicDataSource;
64 import org.junit.After;
65 import org.junit.AfterClass;
66 import org.junit.Before;
67 import org.junit.BeforeClass;
68 import org.junit.Test;
69 import org.mockito.ArgumentCaptor;
70 import org.mockito.Mock;
71 import org.mockito.MockitoAnnotations;
72 import org.onap.policy.common.utils.services.OrderedServiceImpl;
73 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
74 import org.onap.policy.drools.core.lock.Lock;
75 import org.onap.policy.drools.core.lock.LockCallback;
76 import org.onap.policy.drools.core.lock.LockState;
77 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
78 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
79 import org.onap.policy.drools.system.PolicyEngine;
80 import org.onap.policy.drools.system.PolicyEngineConstants;
81 import org.powermock.reflect.Whitebox;
82
83 public class DistributedLockManagerTest {
84     private static final long EXPIRE_SEC = 900L;
85     private static final long RETRY_SEC = 60L;
86     private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
87     private static final String OTHER_HOST = "other-host";
88     private static final String OTHER_OWNER = "other-owner";
89     private static final String EXPECTED_EXCEPTION = "expected exception";
90     private static final String DB_CONNECTION =
91                     "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
92     private static final String DB_USER = "user";
93     private static final String DB_PASSWORD = "password";
94     private static final String OWNER_KEY = "my key";
95     private static final String RESOURCE = "my resource";
96     private static final String RESOURCE2 = "my resource #2";
97     private static final String RESOURCE3 = "my resource #3";
98     private static final String RESOURCE4 = "my resource #4";
99     private static final String RESOURCE5 = "my resource #5";
100     private static final int HOLD_SEC = 100;
101     private static final int HOLD_SEC2 = 120;
102     private static final int MAX_THREADS = 5;
103     private static final int MAX_LOOPS = 100;
104     private static final boolean TRANSIENT = true;
105     private static final boolean PERMANENT = false;
106
107     // number of execute() calls before the first lock attempt
108     private static final int PRE_LOCK_EXECS = 1;
109
110     // number of execute() calls before the first schedule attempt
111     private static final int PRE_SCHED_EXECS = 1;
112
113     private static Connection conn = null;
114     private static ScheduledExecutorService saveExec;
115     private static ScheduledExecutorService realExec;
116
117     @Mock
118     private ScheduledExecutorService exsvc;
119
120     @Mock
121     private LockCallback callback;
122
123     @Mock
124     private BasicDataSource datasrc;
125
126     @Mock
127     private PolicyEngine engine;
128
129     private DistributedLock lock;
130
131     private AtomicInteger nactive;
132     private AtomicInteger nsuccesses;
133     private DistributedLockManager feature;
134
135
136     /**
137      * Configures the location of the property files and creates the DB.
138      *
139      * @throws SQLException if the DB cannot be created
140      */
141     @BeforeClass
142     public static void setUpBeforeClass() throws SQLException {
143         SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
144
145         conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
146
147         try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
148                         + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
149                         + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
150             createStmt.executeUpdate();
151         }
152
153         saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
154
155         realExec = Executors.newScheduledThreadPool(3);
156         Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
157     }
158
159     /**
160      * Restores static fields.
161      */
162     @AfterClass
163     public static void tearDownAfterClass() throws SQLException {
164         Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
165         realExec.shutdown();
166         conn.close();
167     }
168
169     /**
170      * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
171      * tasks.
172      *
173      * @throws SQLException if the lock records cannot be deleted from the DB
174      */
175     @Before
176     public void setUp() throws SQLException {
177         MockitoAnnotations.initMocks(this);
178
179         nactive = new AtomicInteger(0);
180         nsuccesses = new AtomicInteger(0);
181
182         cleanDb();
183
184         when(engine.getExecutorService()).thenReturn(exsvc);
185
186         feature = new MyLockingFeature(true);
187     }
188
189     @After
190     public void tearDown() throws SQLException {
191         shutdownFeature();
192         cleanDb();
193     }
194
195     private void cleanDb() throws SQLException {
196         try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
197             stmt.executeUpdate();
198         }
199     }
200
201     private void shutdownFeature() {
202         if (feature != null) {
203             feature.afterStop(engine);
204             feature = null;
205         }
206     }
207
208     /**
209      * Tests that the feature is found in the expected service sets.
210      */
211     @Test
212     public void testServiceApis() {
213         assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
214                         .anyMatch(obj -> obj instanceof DistributedLockManager));
215     }
216
217     /**
218      * Tests constructor() when properties are invalid.
219      */
220     @Test
221     public void testDistributedLockManagerInvalidProperties() {
222         // use properties containing an invalid value
223         Properties props = new Properties();
224         props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "abc");
225
226         feature = new MyLockingFeature(false) {
227             @Override
228             protected Properties getProperties(String fileName) {
229                 return props;
230             }
231         };
232
233         assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
234     }
235
236     @Test
237     public void testGetSequenceNumber() {
238         assertEquals(1000, feature.getSequenceNumber());
239     }
240
241     @Test
242     public void testStartableApi() {
243         assertTrue(feature.isAlive());
244         assertTrue(feature.start());
245         assertTrue(feature.stop());
246         feature.shutdown();
247
248         // above should have had no effect
249         assertTrue(feature.isAlive());
250
251         feature.afterStop(engine);
252         assertFalse(feature.isAlive());
253     }
254
255     @Test
256     public void testLockApi() {
257         assertFalse(feature.isLocked());
258         assertTrue(feature.lock());
259         assertTrue(feature.unlock());
260     }
261
262     @Test
263     public void testBeforeCreateLockManager() {
264         assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
265     }
266
267     /**
268      * Tests beforeCreate(), when getProperties() throws a runtime exception.
269      */
270     @Test
271     public void testBeforeCreateLockManagerEx() {
272         shutdownFeature();
273
274         feature = new MyLockingFeature(false) {
275             @Override
276             protected Properties getProperties(String fileName) {
277                 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
278             }
279         };
280
281         assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties()))
282                         .isInstanceOf(DistributedLockManagerException.class);
283     }
284
285     @Test
286     public void testAfterStart() {
287         // verify that cleanup & expire check are both added to the queue
288         verify(exsvc).execute(any());
289         verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
290     }
291
292     /**
293      * Tests afterStart(), when thread pool throws a runtime exception.
294      */
295     @Test
296     public void testAfterStartExInThreadPool() {
297         shutdownFeature();
298
299         feature = new MyLockingFeature(false);
300
301         when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
302                         .thenThrow(new IllegalArgumentException(EXPECTED_EXCEPTION));
303
304         assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
305     }
306
307     @Test
308     public void testDeleteExpiredDbLocks() throws SQLException {
309         // add records: two expired, one not
310         insertRecord(RESOURCE, feature.getUuidString(), -1);
311         insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
312         insertRecord(RESOURCE3, OTHER_OWNER, 0);
313         insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
314
315         // get the clean-up function and execute it
316         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
317         verify(exsvc).execute(captor.capture());
318
319         long tbegin = System.currentTimeMillis();
320         Runnable action = captor.getValue();
321         action.run();
322
323         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
324         assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
325         assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
326         assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
327
328         assertEquals(2, getRecordCount());
329     }
330
331     /**
332      * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
333      *
334      * @throws SQLException if an error occurs
335      */
336     @Test
337     public void testDeleteExpiredDbLocksEx() throws SQLException {
338         feature = new InvalidDbLockingFeature(TRANSIENT);
339
340         // get the clean-up function and execute it
341         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
342         verify(exsvc).execute(captor.capture());
343
344         Runnable action = captor.getValue();
345
346         // should not throw an exception
347         action.run();
348     }
349
350     @Test
351     public void testAfterStop() {
352         shutdownFeature();
353
354         feature = new DistributedLockManager();
355
356         // shutdown without calling afterStart()
357
358         shutdownFeature();
359     }
360
361     /**
362      * Tests afterStop(), when the data source throws an exception when close() is called.
363      *
364      * @throws SQLException if an error occurs
365      */
366     @Test
367     public void testAfterStopEx() throws SQLException {
368         shutdownFeature();
369
370         // use a data source that throws an exception when closed
371         feature = new InvalidDbLockingFeature(TRANSIENT);
372
373         shutdownFeature();
374     }
375
376     @Test
377     public void testCreateLock() throws SQLException {
378         verify(exsvc).execute(any());
379
380         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
381         assertTrue(lock.isWaiting());
382
383         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
384
385         // this lock should fail
386         LockCallback callback2 = mock(LockCallback.class);
387         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
388         assertTrue(lock2.isUnavailable());
389         verify(callback2, never()).lockAvailable(lock2);
390         verify(callback2).lockUnavailable(lock2);
391
392         // this should fail, too
393         LockCallback callback3 = mock(LockCallback.class);
394         DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
395         assertTrue(lock3.isUnavailable());
396         verify(callback3, never()).lockAvailable(lock3);
397         verify(callback3).lockUnavailable(lock3);
398
399         // no change to first
400         assertTrue(lock.isWaiting());
401
402         // no callbacks to the first lock
403         verify(callback, never()).lockAvailable(lock);
404         verify(callback, never()).lockUnavailable(lock);
405
406         assertTrue(lock.isWaiting());
407         assertEquals(0, getRecordCount());
408
409         runLock(0, 0);
410         assertTrue(lock.isActive());
411         assertEquals(1, getRecordCount());
412
413         verify(callback).lockAvailable(lock);
414         verify(callback, never()).lockUnavailable(lock);
415
416         // this should succeed
417         DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
418         assertTrue(lock4.isWaiting());
419
420         // after running checker, original records should still remain
421         runChecker(0, 0, EXPIRE_SEC);
422         assertEquals(1, getRecordCount());
423         verify(callback, never()).lockUnavailable(lock);
424     }
425
426     /**
427      * Tests lock() when the feature is not the latest instance.
428      */
429     @Test
430     public void testCreateLockNotLatestInstance() {
431         DistributedLockManager.setLatestInstance(null);
432
433         Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
434         assertTrue(lock.isUnavailable());
435         verify(callback, never()).lockAvailable(any());
436         verify(callback).lockUnavailable(lock);
437     }
438
439     @Test
440     public void testCheckExpired() throws SQLException {
441         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
442         runLock(0, 0);
443
444         LockCallback callback2 = mock(LockCallback.class);
445         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
446         runLock(1, 0);
447
448         LockCallback callback3 = mock(LockCallback.class);
449         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
450         runLock(2, 0);
451
452         LockCallback callback4 = mock(LockCallback.class);
453         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
454         runLock(3, 0);
455
456         LockCallback callback5 = mock(LockCallback.class);
457         final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
458         runLock(4, 0);
459
460         assertEquals(5, getRecordCount());
461
462         // expire one record
463         updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
464
465         // change host of another record
466         updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
467
468         // change uuid of another record
469         updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC);
470
471
472         // run the checker
473         runChecker(0, 0, EXPIRE_SEC);
474
475
476         // check lock states
477         assertTrue(lock.isUnavailable());
478         assertTrue(lock2.isActive());
479         assertTrue(lock3.isUnavailable());
480         assertTrue(lock4.isActive());
481         assertTrue(lock5.isUnavailable());
482
483         // allow callbacks
484         runLock(5, 2);
485         runLock(6, 1);
486         runLock(7, 0);
487         verify(callback).lockUnavailable(lock);
488         verify(callback3).lockUnavailable(lock3);
489         verify(callback5).lockUnavailable(lock5);
490
491         verify(callback2, never()).lockUnavailable(lock2);
492         verify(callback4, never()).lockUnavailable(lock4);
493
494
495         // another check should have been scheduled, with the normal interval
496         runChecker(1, 0, EXPIRE_SEC);
497     }
498
499     /**
500      * Tests checkExpired(), when schedule() throws an exception.
501      */
502     @Test
503     public void testCheckExpiredExecRejected() {
504         // arrange for execution to be rejected
505         when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
506                         .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
507
508         runChecker(0, 0, EXPIRE_SEC);
509     }
510
511     /**
512      * Tests checkExpired(), when getConnection() throws an exception.
513      */
514     @Test
515     public void testCheckExpiredSqlEx() {
516         // use a data source that throws an exception when getConnection() is called
517         feature = new InvalidDbLockingFeature(TRANSIENT);
518
519         runChecker(0, 0, EXPIRE_SEC);
520
521         // it should have scheduled another check, sooner
522         runChecker(0, 0, RETRY_SEC);
523     }
524
525     @Test
526     public void testExpireLocks() throws SQLException {
527         AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
528
529         feature = new MyLockingFeature(true) {
530             @Override
531             protected BasicDataSource makeDataSource() throws Exception {
532                 // get the real data source
533                 BasicDataSource src2 = super.makeDataSource();
534
535                 when(datasrc.getConnection()).thenAnswer(answer -> {
536                     DistributedLock lck = freeLock.getAndSet(null);
537                     if (lck != null) {
538                         // free it
539                         lck.free();
540
541                         // run its doUnlock
542                         runLock(4, 0);
543                     }
544
545                     return src2.getConnection();
546                 });
547
548                 return datasrc;
549             }
550         };
551
552         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
553         runLock(0, 0);
554
555         LockCallback callback2 = mock(LockCallback.class);
556         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
557         runLock(1, 0);
558
559         LockCallback callback3 = mock(LockCallback.class);
560         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
561         // don't run doLock for lock3 - leave it in the waiting state
562
563         LockCallback callback4 = mock(LockCallback.class);
564         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
565         runLock(3, 0);
566
567         assertEquals(3, getRecordCount());
568
569         // expire one record
570         updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
571
572         // arrange to free lock4 while the checker is running
573         freeLock.set(lock4);
574
575         // run the checker
576         runChecker(0, 0, EXPIRE_SEC);
577
578
579         // check lock states
580         assertTrue(lock.isUnavailable());
581         assertTrue(lock2.isActive());
582         assertTrue(lock3.isWaiting());
583         assertTrue(lock4.isUnavailable());
584
585         runLock(5, 0);
586         verify(exsvc, times(PRE_LOCK_EXECS + 6)).execute(any());
587
588         verify(callback).lockUnavailable(lock);
589         verify(callback2, never()).lockUnavailable(lock2);
590         verify(callback3, never()).lockUnavailable(lock3);
591         verify(callback4, never()).lockUnavailable(lock4);
592     }
593
594     @Test
595     public void testDistributedLockNoArgs() {
596         DistributedLock lock = new DistributedLock();
597         assertNull(lock.getResourceId());
598         assertNull(lock.getOwnerKey());
599         assertNull(lock.getCallback());
600         assertEquals(0, lock.getHoldSec());
601     }
602
603     @Test
604     public void testDistributedLock() {
605         assertThatIllegalArgumentException()
606                         .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
607                         .withMessageContaining("holdSec");
608
609         // should generate no exception
610         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
611     }
612
613     @Test
614     public void testDistributedLockSerializable() throws Exception {
615         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
616         lock = roundTrip(lock);
617
618         assertTrue(lock.isWaiting());
619
620         assertEquals(RESOURCE, lock.getResourceId());
621         assertEquals(OWNER_KEY, lock.getOwnerKey());
622         assertNull(lock.getCallback());
623         assertEquals(HOLD_SEC, lock.getHoldSec());
624     }
625
626     @Test
627     public void testGrant() {
628         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
629         assertFalse(lock.isActive());
630
631         // execute the doLock() call
632         runLock(0, 0);
633
634         assertTrue(lock.isActive());
635
636         // the callback for the lock should have been run in the foreground thread
637         verify(callback).lockAvailable(lock);
638     }
639
640     /**
641      * Tests grant() when the lock is already unavailable.
642      */
643     @Test
644     public void testDistributedLockGrantUnavailable() {
645         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
646         lock.setState(LockState.UNAVAILABLE);
647         lock.grant();
648
649         assertTrue(lock.isUnavailable());
650         verify(callback, never()).lockAvailable(any());
651         verify(callback, never()).lockUnavailable(any());
652     }
653
654     @Test
655     public void testDistributedLockDeny() {
656         // get a lock
657         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
658
659         // get another lock - should fail
660         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
661
662         assertTrue(lock.isUnavailable());
663
664         // the callback for the second lock should have been run in the foreground thread
665         verify(callback).lockUnavailable(lock);
666
667         // should only have a request for the first lock
668         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
669     }
670
671     @Test
672     public void testDistributedLockFree() {
673         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
674
675         assertTrue(lock.free());
676         assertTrue(lock.isUnavailable());
677
678         // run both requests associated with the lock
679         runLock(0, 1);
680         runLock(1, 0);
681
682         // should not have changed state
683         assertTrue(lock.isUnavailable());
684
685         // attempt to free it again
686         assertFalse(lock.free());
687
688         // should not have queued anything else
689         verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
690
691         // new lock should succeed
692         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
693         assertTrue(lock2 != lock);
694         assertTrue(lock2.isWaiting());
695     }
696
697     /**
698      * Tests that free() works on a serialized lock with a new feature.
699      *
700      * @throws Exception if an error occurs
701      */
702     @Test
703     public void testDistributedLockFreeSerialized() throws Exception {
704         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
705
706         feature = new MyLockingFeature(true);
707
708         lock = roundTrip(lock);
709         assertTrue(lock.free());
710         assertTrue(lock.isUnavailable());
711     }
712
713     /**
714      * Tests free() on a serialized lock without a feature.
715      *
716      * @throws Exception if an error occurs
717      */
718     @Test
719     public void testDistributedLockFreeNoFeature() throws Exception {
720         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
721
722         DistributedLockManager.setLatestInstance(null);
723
724         lock = roundTrip(lock);
725         assertFalse(lock.free());
726         assertTrue(lock.isUnavailable());
727     }
728
729     /**
730      * Tests the case where the lock is freed and doUnlock called between the call to
731      * isUnavailable() and the call to compute().
732      */
733     @Test
734     public void testDistributedLockFreeUnlocked() {
735         feature = new FreeWithFreeLockingFeature(true);
736
737         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
738
739         assertFalse(lock.free());
740         assertTrue(lock.isUnavailable());
741     }
742
743     /**
744      * Tests the case where the lock is freed, but doUnlock is not completed, between the
745      * call to isUnavailable() and the call to compute().
746      */
747     @Test
748     public void testDistributedLockFreeLockFreed() {
749         feature = new FreeWithFreeLockingFeature(false);
750
751         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
752
753         assertFalse(lock.free());
754         assertTrue(lock.isUnavailable());
755     }
756
757     @Test
758     public void testDistributedLockExtend() {
759         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
760
761         // lock2 should be denied - called back by this thread
762         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
763         verify(callback, never()).lockAvailable(lock2);
764         verify(callback).lockUnavailable(lock2);
765
766         // lock2 will still be denied - called back by this thread
767         lock2.extend(HOLD_SEC, callback);
768         verify(callback, times(2)).lockUnavailable(lock2);
769
770         // force lock2 to be active - should still be denied
771         Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
772         lock2.extend(HOLD_SEC, callback);
773         verify(callback, times(3)).lockUnavailable(lock2);
774
775         assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
776                         .withMessageContaining("holdSec");
777
778         // execute doLock()
779         runLock(0, 0);
780         assertTrue(lock.isActive());
781
782         // now extend the first lock
783         LockCallback callback2 = mock(LockCallback.class);
784         lock.extend(HOLD_SEC2, callback2);
785         assertTrue(lock.isWaiting());
786
787         // execute doExtend()
788         runLock(1, 0);
789         lock.extend(HOLD_SEC2, callback2);
790         assertEquals(HOLD_SEC2, lock.getHoldSec());
791         verify(callback2).lockAvailable(lock);
792         verify(callback2, never()).lockUnavailable(lock);
793     }
794
795     /**
796      * Tests that extend() works on a serialized lock with a new feature.
797      *
798      * @throws Exception if an error occurs
799      */
800     @Test
801     public void testDistributedLockExtendSerialized() throws Exception {
802         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
803
804         // run doLock
805         runLock(0, 0);
806         assertTrue(lock.isActive());
807
808         feature = new MyLockingFeature(true);
809
810         lock = roundTrip(lock);
811         assertTrue(lock.isActive());
812
813         LockCallback scallback = mock(LockCallback.class);
814
815         lock.extend(HOLD_SEC, scallback);
816         assertTrue(lock.isWaiting());
817
818         // run doExtend (in new feature)
819         runLock(0, 0);
820         assertTrue(lock.isActive());
821
822         verify(scallback).lockAvailable(lock);
823         verify(scallback, never()).lockUnavailable(lock);
824     }
825
826     /**
827      * Tests extend() on a serialized lock without a feature.
828      *
829      * @throws Exception if an error occurs
830      */
831     @Test
832     public void testDistributedLockExtendNoFeature() throws Exception {
833         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
834
835         // run doLock
836         runLock(0, 0);
837         assertTrue(lock.isActive());
838
839         DistributedLockManager.setLatestInstance(null);
840
841         lock = roundTrip(lock);
842         assertTrue(lock.isActive());
843
844         LockCallback scallback = mock(LockCallback.class);
845
846         lock.extend(HOLD_SEC, scallback);
847         assertTrue(lock.isUnavailable());
848
849         verify(scallback, never()).lockAvailable(lock);
850         verify(scallback).lockUnavailable(lock);
851     }
852
853     /**
854      * Tests the case where the lock is freed and doUnlock called between the call to
855      * isUnavailable() and the call to compute().
856      */
857     @Test
858     public void testDistributedLockExtendUnlocked() {
859         feature = new FreeWithFreeLockingFeature(true);
860
861         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
862
863         lock.extend(HOLD_SEC2, callback);
864         assertTrue(lock.isUnavailable());
865         verify(callback).lockUnavailable(lock);
866     }
867
868     /**
869      * Tests the case where the lock is freed, but doUnlock is not completed, between the
870      * call to isUnavailable() and the call to compute().
871      */
872     @Test
873     public void testDistributedLockExtendLockFreed() {
874         feature = new FreeWithFreeLockingFeature(false);
875
876         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
877
878         lock.extend(HOLD_SEC2, callback);
879         assertTrue(lock.isUnavailable());
880         verify(callback).lockUnavailable(lock);
881     }
882
883     @Test
884     public void testDistributedLockScheduleRequest() {
885         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
886         runLock(0, 0);
887
888         verify(callback).lockAvailable(lock);
889     }
890
891     @Test
892     public void testDistributedLockRescheduleRequest() throws SQLException {
893         // use a data source that throws an exception when getConnection() is called
894         InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
895         feature = invfeat;
896
897         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
898
899         // invoke doLock - should fail and reschedule
900         runLock(0, 0);
901
902         // should still be waiting
903         assertTrue(lock.isWaiting());
904         verify(callback, never()).lockUnavailable(lock);
905
906         // free the lock while doLock is executing
907         invfeat.freeLock = true;
908
909         // try scheduled request - should just invoke doUnlock
910         runSchedule(0, 0);
911
912         // should still be waiting
913         assertTrue(lock.isUnavailable());
914         verify(callback, never()).lockUnavailable(lock);
915
916         // should have scheduled a retry of doUnlock
917         verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
918     }
919
920     @Test
921     public void testDistributedLockGetNextRequest() {
922         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
923
924         /*
925          * run doLock. This should cause getNextRequest() to be called twice, once with a
926          * request in the queue, and the second time with request=null.
927          */
928         runLock(0, 0);
929     }
930
931     /**
932      * Tests getNextRequest(), where the same request is still in the queue the second
933      * time it's called.
934      */
935     @Test
936     public void testDistributedLockGetNextRequestSameRequest() {
937         // force reschedule to be invoked
938         feature = new InvalidDbLockingFeature(TRANSIENT);
939
940         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
941
942         /*
943          * run doLock. This should cause getNextRequest() to be called twice, once with a
944          * request in the queue, and the second time with the same request again.
945          */
946         runLock(0, 0);
947
948         verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
949     }
950
951     @Test
952     public void testDistributedLockDoRequest() throws SQLException {
953         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
954
955         assertTrue(lock.isWaiting());
956
957         // run doLock via doRequest
958         runLock(0, 0);
959
960         assertTrue(lock.isActive());
961     }
962
963     /**
964      * Tests doRequest(), when doRequest() is already running within another thread.
965      */
966     @Test
967     public void testDistributedLockDoRequestBusy() {
968         /*
969          * this feature will invoke a request in a background thread while it's being run
970          * in a foreground thread.
971          */
972         AtomicBoolean running = new AtomicBoolean(false);
973         AtomicBoolean returned = new AtomicBoolean(false);
974
975         feature = new MyLockingFeature(true) {
976             @Override
977             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
978                             LockCallback callback) {
979                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
980                     private static final long serialVersionUID = 1L;
981
982                     @Override
983                     protected boolean doDbInsert(Connection conn) throws SQLException {
984                         if (running.get()) {
985                             // already inside the thread - don't recurse any further
986                             return super.doDbInsert(conn);
987                         }
988
989                         running.set(true);
990
991                         Thread thread = new Thread(() -> {
992                             // run doLock from within the new thread
993                             runLock(0, 0);
994                         });
995                         thread.setDaemon(true);
996                         thread.start();
997
998                         // wait for the background thread to complete before continuing
999                         try {
1000                             thread.join(5000);
1001                         } catch (InterruptedException ignore) {
1002                             Thread.currentThread().interrupt();
1003                         }
1004
1005                         returned.set(!thread.isAlive());
1006
1007                         return super.doDbInsert(conn);
1008                     }
1009                 };
1010             }
1011         };
1012
1013         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1014
1015         // run doLock
1016         runLock(0, 0);
1017
1018         assertTrue(returned.get());
1019     }
1020
1021     /**
1022      * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1023      *
1024      * @throws SQLException if an error occurs
1025      */
1026     @Test
1027     public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1028         // throw run-time exception
1029         when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1030
1031         // use a data source that throws an exception when getConnection() is called
1032         feature = new MyLockingFeature(true) {
1033             @Override
1034             protected BasicDataSource makeDataSource() throws Exception {
1035                 return datasrc;
1036             }
1037         };
1038
1039         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1040
1041         // invoke doLock - should NOT reschedule
1042         runLock(0, 0);
1043
1044         assertTrue(lock.isUnavailable());
1045         verify(callback).lockUnavailable(lock);
1046
1047         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1048     }
1049
1050     /**
1051      * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1052      * state.
1053      *
1054      * @throws SQLException if an error occurs
1055      */
1056     @Test
1057     public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1058         // throw run-time exception
1059         when(datasrc.getConnection()).thenAnswer(answer -> {
1060             lock.free();
1061             throw new IllegalStateException(EXPECTED_EXCEPTION);
1062         });
1063
1064         // use a data source that throws an exception when getConnection() is called
1065         feature = new MyLockingFeature(true) {
1066             @Override
1067             protected BasicDataSource makeDataSource() throws Exception {
1068                 return datasrc;
1069             }
1070         };
1071
1072         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1073
1074         // invoke doLock - should NOT reschedule
1075         runLock(0, 0);
1076
1077         assertTrue(lock.isUnavailable());
1078         verify(callback, never()).lockUnavailable(lock);
1079
1080         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1081     }
1082
1083     /**
1084      * Tests doRequest() when the retry count gets exhausted.
1085      */
1086     @Test
1087     public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1088         // use a data source that throws an exception when getConnection() is called
1089         feature = new InvalidDbLockingFeature(TRANSIENT);
1090
1091         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1092
1093         // invoke doLock - should fail and reschedule
1094         runLock(0, 0);
1095
1096         // should still be waiting
1097         assertTrue(lock.isWaiting());
1098         verify(callback, never()).lockUnavailable(lock);
1099
1100         // try again, via SCHEDULER - first retry fails
1101         runSchedule(0, 0);
1102
1103         // should still be waiting
1104         assertTrue(lock.isWaiting());
1105         verify(callback, never()).lockUnavailable(lock);
1106
1107         // try again, via SCHEDULER - final retry fails
1108         runSchedule(1, 0);
1109         assertTrue(lock.isUnavailable());
1110
1111         // now callback should have been called
1112         verify(callback).lockUnavailable(lock);
1113     }
1114
1115     /**
1116      * Tests doRequest() when a non-transient DB exception is thrown.
1117      */
1118     @Test
1119     public void testDistributedLockDoRequestNotTransient() {
1120         /*
1121          * use a data source that throws a PERMANENT exception when getConnection() is
1122          * called
1123          */
1124         feature = new InvalidDbLockingFeature(PERMANENT);
1125
1126         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1127
1128         // invoke doLock - should fail
1129         runLock(0, 0);
1130
1131         assertTrue(lock.isUnavailable());
1132         verify(callback).lockUnavailable(lock);
1133
1134         // should not have scheduled anything new
1135         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1136         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1137     }
1138
1139     @Test
1140     public void testDistributedLockDoLock() throws SQLException {
1141         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1142
1143         // invoke doLock - should simply do an insert
1144         long tbegin = System.currentTimeMillis();
1145         runLock(0, 0);
1146
1147         assertEquals(1, getRecordCount());
1148         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1149         verify(callback).lockAvailable(lock);
1150     }
1151
1152     /**
1153      * Tests doLock() when the lock is freed before doLock runs.
1154      *
1155      * @throws SQLException if an error occurs
1156      */
1157     @Test
1158     public void testDistributedLockDoLockFreed() throws SQLException {
1159         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1160
1161         lock.setState(LockState.UNAVAILABLE);
1162
1163         // invoke doLock - should do nothing
1164         runLock(0, 0);
1165
1166         assertEquals(0, getRecordCount());
1167
1168         verify(callback, never()).lockAvailable(lock);
1169     }
1170
1171     /**
1172      * Tests doLock() when a DB exception is thrown.
1173      */
1174     @Test
1175     public void testDistributedLockDoLockEx() {
1176         // use a data source that throws an exception when getConnection() is called
1177         feature = new InvalidDbLockingFeature(PERMANENT);
1178
1179         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1180
1181         // invoke doLock - should simply do an insert
1182         runLock(0, 0);
1183
1184         // lock should have failed due to exception
1185         verify(callback).lockUnavailable(lock);
1186     }
1187
1188     /**
1189      * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1190      * to be called.
1191      */
1192     @Test
1193     public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1194         // insert an expired record
1195         insertRecord(RESOURCE, feature.getUuidString(), 0);
1196
1197         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1198
1199         // invoke doLock - should simply do an update
1200         runLock(0, 0);
1201         verify(callback).lockAvailable(lock);
1202     }
1203
1204     /**
1205      * Tests doLock() when a locked record already exists.
1206      */
1207     @Test
1208     public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1209         // insert an expired record
1210         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1211
1212         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1213
1214         // invoke doLock
1215         runLock(0, 0);
1216
1217         // lock should have failed because it's already locked
1218         verify(callback).lockUnavailable(lock);
1219     }
1220
1221     @Test
1222     public void testDistributedLockDoUnlock() throws SQLException {
1223         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1224
1225         // invoke doLock()
1226         runLock(0, 0);
1227
1228         lock.free();
1229
1230         // invoke doUnlock()
1231         long tbegin = System.currentTimeMillis();
1232         runLock(1, 0);
1233
1234         assertEquals(0, getRecordCount());
1235         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1236
1237         assertTrue(lock.isUnavailable());
1238
1239         // no more callbacks should have occurred
1240         verify(callback, times(1)).lockAvailable(lock);
1241         verify(callback, never()).lockUnavailable(lock);
1242     }
1243
1244     /**
1245      * Tests doUnlock() when a DB exception is thrown.
1246      *
1247      * @throws SQLException if an error occurs
1248      */
1249     @Test
1250     public void testDistributedLockDoUnlockEx() throws SQLException {
1251         feature = new InvalidDbLockingFeature(PERMANENT);
1252
1253         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1254
1255         // do NOT invoke doLock() - it will fail without a DB connection
1256
1257         lock.free();
1258
1259         // invoke doUnlock()
1260         runLock(1, 0);
1261
1262         assertTrue(lock.isUnavailable());
1263
1264         // no more callbacks should have occurred
1265         verify(callback, never()).lockAvailable(lock);
1266         verify(callback, never()).lockUnavailable(lock);
1267     }
1268
1269     @Test
1270     public void testDistributedLockDoExtend() throws SQLException {
1271         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1272         runLock(0, 0);
1273
1274         LockCallback callback2 = mock(LockCallback.class);
1275         lock.extend(HOLD_SEC2, callback2);
1276
1277         // call doExtend()
1278         long tbegin = System.currentTimeMillis();
1279         runLock(1, 0);
1280
1281         assertEquals(1, getRecordCount());
1282         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1283
1284         assertTrue(lock.isActive());
1285
1286         // no more callbacks should have occurred
1287         verify(callback).lockAvailable(lock);
1288         verify(callback, never()).lockUnavailable(lock);
1289
1290         // extension should have succeeded
1291         verify(callback2).lockAvailable(lock);
1292         verify(callback2, never()).lockUnavailable(lock);
1293     }
1294
1295     /**
1296      * Tests doExtend() when the lock is freed before doExtend runs.
1297      *
1298      * @throws SQLException if an error occurs
1299      */
1300     @Test
1301     public void testDistributedLockDoExtendFreed() throws SQLException {
1302         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1303         lock.extend(HOLD_SEC2, callback);
1304
1305         lock.setState(LockState.UNAVAILABLE);
1306
1307         // invoke doExtend - should do nothing
1308         runLock(1, 0);
1309
1310         assertEquals(0, getRecordCount());
1311
1312         verify(callback, never()).lockAvailable(lock);
1313     }
1314
1315     /**
1316      * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1317      * insert.
1318      *
1319      * @throws SQLException if an error occurs
1320      */
1321     @Test
1322     public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1323         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1324         runLock(0, 0);
1325
1326         LockCallback callback2 = mock(LockCallback.class);
1327         lock.extend(HOLD_SEC2, callback2);
1328
1329         // delete the record so it's forced to re-insert it
1330         cleanDb();
1331
1332         // call doExtend()
1333         long tbegin = System.currentTimeMillis();
1334         runLock(1, 0);
1335
1336         assertEquals(1, getRecordCount());
1337         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1338
1339         assertTrue(lock.isActive());
1340
1341         // no more callbacks should have occurred
1342         verify(callback).lockAvailable(lock);
1343         verify(callback, never()).lockUnavailable(lock);
1344
1345         // extension should have succeeded
1346         verify(callback2).lockAvailable(lock);
1347         verify(callback2, never()).lockUnavailable(lock);
1348     }
1349
1350     /**
1351      * Tests doExtend() when both update and insert fail.
1352      *
1353      * @throws SQLException if an error occurs
1354      */
1355     @Test
1356     public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
1357         /*
1358          * this feature will create a lock that returns false when doDbUpdate() is
1359          * invoked, or when doDbInsert() is invoked a second time
1360          */
1361         feature = new MyLockingFeature(true) {
1362             @Override
1363             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1364                             LockCallback callback) {
1365                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1366                     private static final long serialVersionUID = 1L;
1367                     private int ntimes = 0;
1368
1369                     @Override
1370                     protected boolean doDbInsert(Connection conn) throws SQLException {
1371                         if (ntimes++ > 0) {
1372                             return false;
1373                         }
1374
1375                         return super.doDbInsert(conn);
1376                     }
1377
1378                     @Override
1379                     protected boolean doDbUpdate(Connection conn) {
1380                         return false;
1381                     }
1382                 };
1383             }
1384         };
1385
1386         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1387         runLock(0, 0);
1388
1389         LockCallback callback2 = mock(LockCallback.class);
1390         lock.extend(HOLD_SEC2, callback2);
1391
1392         // call doExtend()
1393         runLock(1, 0);
1394
1395         assertTrue(lock.isUnavailable());
1396
1397         // no more callbacks should have occurred
1398         verify(callback).lockAvailable(lock);
1399         verify(callback, never()).lockUnavailable(lock);
1400
1401         // extension should have failed
1402         verify(callback2, never()).lockAvailable(lock);
1403         verify(callback2).lockUnavailable(lock);
1404     }
1405
1406     /**
1407      * Tests doExtend() when an exception occurs.
1408      *
1409      * @throws SQLException if an error occurs
1410      */
1411     @Test
1412     public void testDistributedLockDoExtendEx() throws SQLException {
1413         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1414         runLock(0, 0);
1415
1416         /*
1417          * delete the record and insert one with a different owner, which will cause
1418          * doDbInsert() to throw an exception
1419          */
1420         cleanDb();
1421         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1422
1423         LockCallback callback2 = mock(LockCallback.class);
1424         lock.extend(HOLD_SEC2, callback2);
1425
1426         // call doExtend()
1427         runLock(1, 0);
1428
1429         assertTrue(lock.isUnavailable());
1430
1431         // no more callbacks should have occurred
1432         verify(callback).lockAvailable(lock);
1433         verify(callback, never()).lockUnavailable(lock);
1434
1435         // extension should have failed
1436         verify(callback2, never()).lockAvailable(lock);
1437         verify(callback2).lockUnavailable(lock);
1438     }
1439
1440     @Test
1441     public void testDistributedLockToString() {
1442         String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
1443         assertNotNull(text);
1444         assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1445     }
1446
1447     @Test
1448     public void testMakeThreadPool() {
1449         // use a REAL feature to test this
1450         feature = new DistributedLockManager();
1451
1452         // this should create a thread pool
1453         feature.beforeCreateLockManager(engine, new Properties());
1454         feature.afterStart(engine);
1455
1456         shutdownFeature();
1457     }
1458
1459     /**
1460      * Performs a multi-threaded test of the locking facility.
1461      *
1462      * @throws InterruptedException if the current thread is interrupted while waiting for
1463      *         the background threads to complete
1464      */
1465     @Test
1466     public void testMultiThreaded() throws InterruptedException {
1467         feature = new DistributedLockManager();
1468         feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1469         feature.afterStart(PolicyEngineConstants.getManager());
1470
1471         List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1472         for (int x = 0; x < MAX_THREADS; ++x) {
1473             threads.add(new MyThread());
1474         }
1475
1476         threads.forEach(Thread::start);
1477
1478         for (MyThread thread : threads) {
1479             thread.join(6000);
1480             assertFalse(thread.isAlive());
1481         }
1482
1483         for (MyThread thread : threads) {
1484             if (thread.err != null) {
1485                 throw thread.err;
1486             }
1487         }
1488
1489         assertTrue(nsuccesses.get() > 0);
1490     }
1491
1492     private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
1493                     boolean waitForLock) {
1494         return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
1495     }
1496
1497     private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1498         ByteArrayOutputStream baos = new ByteArrayOutputStream();
1499         try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1500             oos.writeObject(lock);
1501         }
1502
1503         ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1504         try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1505             return (DistributedLock) ois.readObject();
1506         }
1507     }
1508
1509     /**
1510      * Runs the checkExpired() action.
1511      *
1512      * @param nskip number of actions in the work queue to skip
1513      * @param nadditional number of additional actions that appear in the work queue
1514      *        <i>after</i> the checkExpired action
1515      * @param schedSec number of seconds for which the checker should have been scheduled
1516      */
1517     private void runChecker(int nskip, int nadditional, long schedSec) {
1518         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1519         verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1520         Runnable action = captor.getAllValues().get(nskip);
1521         action.run();
1522     }
1523
1524     /**
1525      * Runs a lock action (e.g., doLock, doUnlock).
1526      *
1527      * @param nskip number of actions in the work queue to skip
1528      * @param nadditional number of additional actions that appear in the work queue
1529      *        <i>after</i> the desired action
1530      */
1531     void runLock(int nskip, int nadditional) {
1532         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1533         verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1534
1535         Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1536         action.run();
1537     }
1538
1539     /**
1540      * Runs a scheduled action (e.g., "retry" action).
1541      *
1542      * @param nskip number of actions in the work queue to skip
1543      * @param nadditional number of additional actions that appear in the work queue
1544      *        <i>after</i> the desired action
1545      */
1546     void runSchedule(int nskip, int nadditional) {
1547         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1548         verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
1549
1550         Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1551         action.run();
1552     }
1553
1554     /**
1555      * Gets a count of the number of lock records in the DB.
1556      *
1557      * @return the number of lock records in the DB
1558      * @throws SQLException if an error occurs accessing the DB
1559      */
1560     private int getRecordCount() throws SQLException {
1561         try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1562                         ResultSet result = stmt.executeQuery()) {
1563
1564             if (result.next()) {
1565                 return result.getInt(1);
1566
1567             } else {
1568                 return 0;
1569             }
1570         }
1571     }
1572
1573     /**
1574      * Determines if there is a record for the given resource whose expiration time is in
1575      * the expected range.
1576      *
1577      * @param resourceId ID of the resource of interest
1578      * @param uuidString UUID string of the owner
1579      * @param holdSec seconds for which the lock was to be held
1580      * @param tbegin earliest time, in milliseconds, at which the record could have been
1581      *        inserted into the DB
1582      * @return {@code true} if a record is found, {@code false} otherwise
1583      * @throws SQLException if an error occurs accessing the DB
1584      */
1585     private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1586         try (PreparedStatement stmt =
1587                         conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1588                                         + " WHERE resourceId=? AND host=? AND owner=?")) {
1589
1590             stmt.setString(1, resourceId);
1591             stmt.setString(2, feature.getHostName());
1592             stmt.setString(3, uuidString);
1593
1594             try (ResultSet result = stmt.executeQuery()) {
1595                 if (result.next()) {
1596                     int remaining = result.getInt(1);
1597                     long maxDiff = System.currentTimeMillis() - tbegin;
1598                     return (remaining >= 0 && holdSec - remaining <= maxDiff);
1599
1600                 } else {
1601                     return false;
1602                 }
1603             }
1604         }
1605     }
1606
1607     /**
1608      * Inserts a record into the DB.
1609      *
1610      * @param resourceId ID of the resource of interest
1611      * @param uuidString UUID string of the owner
1612      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1613      * @throws SQLException if an error occurs accessing the DB
1614      */
1615     private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1616         this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset);
1617     }
1618
1619     private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1620                     throws SQLException {
1621         try (PreparedStatement stmt =
1622                         conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1623                                         + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1624
1625             stmt.setString(1, resourceId);
1626             stmt.setString(2, hostName);
1627             stmt.setString(3, uuidString);
1628             stmt.setInt(4, expireOffset);
1629
1630             assertEquals(1, stmt.executeUpdate());
1631         }
1632     }
1633
1634     /**
1635      * Updates a record in the DB.
1636      *
1637      * @param resourceId ID of the resource of interest
1638      * @param newUuid UUID string of the <i>new</i> owner
1639      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1640      * @throws SQLException if an error occurs accessing the DB
1641      */
1642     private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1643         try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1644                         + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1645
1646             stmt.setString(1, newHost);
1647             stmt.setString(2, newUuid);
1648             stmt.setInt(3, expireOffset);
1649             stmt.setString(4, resourceId);
1650
1651             assertEquals(1, stmt.executeUpdate());
1652         }
1653     }
1654
1655     /**
1656      * Feature that uses <i>exsvc</i> to execute requests.
1657      */
1658     private class MyLockingFeature extends DistributedLockManager {
1659
1660         public MyLockingFeature(boolean init) {
1661             shutdownFeature();
1662
1663             exsvc = mock(ScheduledExecutorService.class);
1664             when(engine.getExecutorService()).thenReturn(exsvc);
1665
1666             if (init) {
1667                 beforeCreateLockManager(engine, new Properties());
1668                 afterStart(engine);
1669             }
1670         }
1671     }
1672
1673     /**
1674      * Feature whose data source all throws exceptions.
1675      */
1676     private class InvalidDbLockingFeature extends MyLockingFeature {
1677         private boolean isTransient;
1678         private boolean freeLock = false;
1679
1680         public InvalidDbLockingFeature(boolean isTransient) {
1681             // pass "false" because we have to set the error code BEFORE calling
1682             // afterStart()
1683             super(false);
1684
1685             this.isTransient = isTransient;
1686
1687             this.beforeCreateLockManager(engine, new Properties());
1688             this.afterStart(engine);
1689         }
1690
1691         @Override
1692         protected BasicDataSource makeDataSource() throws Exception {
1693             when(datasrc.getConnection()).thenAnswer(answer -> {
1694                 if (freeLock) {
1695                     freeLock = false;
1696                     lock.free();
1697                 }
1698
1699                 throw makeEx();
1700             });
1701
1702             doThrow(makeEx()).when(datasrc).close();
1703
1704             return datasrc;
1705         }
1706
1707         private SQLException makeEx() {
1708             if (isTransient) {
1709                 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1710
1711             } else {
1712                 return new SQLException(EXPECTED_EXCEPTION);
1713             }
1714         }
1715     }
1716
1717     /**
1718      * Feature whose locks free themselves while free() is already running.
1719      */
1720     private class FreeWithFreeLockingFeature extends MyLockingFeature {
1721         private boolean relock;
1722
1723         public FreeWithFreeLockingFeature(boolean relock) {
1724             super(true);
1725             this.relock = relock;
1726         }
1727
1728         @Override
1729         protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1730                         LockCallback callback) {
1731
1732             return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1733                 private static final long serialVersionUID = 1L;
1734                 private boolean checked = false;
1735
1736                 @Override
1737                 public boolean isUnavailable() {
1738                     if (checked) {
1739                         return super.isUnavailable();
1740                     }
1741
1742                     checked = true;
1743
1744                     // release and relock
1745                     free();
1746
1747                     if (relock) {
1748                         // run doUnlock
1749                         runLock(1, 0);
1750
1751                         // relock it
1752                         createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1753                     }
1754
1755                     return false;
1756                 }
1757             };
1758         }
1759     }
1760
1761     /**
1762      * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
1763      * extend it, and then unlock it.
1764      */
1765     private class MyThread extends Thread {
1766         AssertionError err = null;
1767
1768         public MyThread() {
1769             setDaemon(true);
1770         }
1771
1772         @Override
1773         public void run() {
1774             try {
1775                 for (int x = 0; x < MAX_LOOPS; ++x) {
1776                     makeAttempt();
1777                 }
1778
1779             } catch (AssertionError e) {
1780                 err = e;
1781             }
1782         }
1783
1784         private void makeAttempt() {
1785             try {
1786                 Semaphore sem = new Semaphore(0);
1787
1788                 LockCallback cb = new LockCallback() {
1789                     @Override
1790                     public void lockAvailable(Lock lock) {
1791                         sem.release();
1792                     }
1793
1794                     @Override
1795                     public void lockUnavailable(Lock lock) {
1796                         sem.release();
1797                     }
1798                 };
1799
1800                 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1801
1802                 // wait for callback, whether available or unavailable
1803                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1804                 if (!lock.isActive()) {
1805                     return;
1806                 }
1807
1808                 nsuccesses.incrementAndGet();
1809
1810                 assertEquals(1, nactive.incrementAndGet());
1811
1812                 lock.extend(HOLD_SEC2, cb);
1813                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1814                 assertTrue(lock.isActive());
1815
1816                 // decrement BEFORE free()
1817                 nactive.decrementAndGet();
1818
1819                 assertTrue(lock.free());
1820                 assertTrue(lock.isUnavailable());
1821
1822             } catch (InterruptedException e) {
1823                 Thread.currentThread().interrupt();
1824                 throw new AssertionError("interrupted", e);
1825             }
1826         }
1827     }
1828 }