0c0dbf11aa5bb6cb97353a77b320488b5388a684
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.distributed.locking;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
26 import static org.assertj.core.api.Assertions.assertThatThrownBy;
27 import static org.junit.Assert.assertEquals;
28 import static org.junit.Assert.assertFalse;
29 import static org.junit.Assert.assertNotNull;
30 import static org.junit.Assert.assertNotSame;
31 import static org.junit.Assert.assertNull;
32 import static org.junit.Assert.assertSame;
33 import static org.junit.Assert.assertTrue;
34 import static org.mockito.ArgumentMatchers.any;
35 import static org.mockito.ArgumentMatchers.anyBoolean;
36 import static org.mockito.ArgumentMatchers.anyLong;
37 import static org.mockito.ArgumentMatchers.eq;
38 import static org.mockito.Mockito.doThrow;
39 import static org.mockito.Mockito.mock;
40 import static org.mockito.Mockito.never;
41 import static org.mockito.Mockito.times;
42 import static org.mockito.Mockito.verify;
43 import static org.mockito.Mockito.when;
44
45 import java.io.ByteArrayInputStream;
46 import java.io.ByteArrayOutputStream;
47 import java.io.ObjectInputStream;
48 import java.io.ObjectOutputStream;
49 import java.sql.Connection;
50 import java.sql.DriverManager;
51 import java.sql.PreparedStatement;
52 import java.sql.ResultSet;
53 import java.sql.SQLException;
54 import java.sql.SQLTransientException;
55 import java.util.ArrayList;
56 import java.util.List;
57 import java.util.Properties;
58 import java.util.concurrent.Executors;
59 import java.util.concurrent.RejectedExecutionException;
60 import java.util.concurrent.ScheduledExecutorService;
61 import java.util.concurrent.ScheduledFuture;
62 import java.util.concurrent.Semaphore;
63 import java.util.concurrent.TimeUnit;
64 import java.util.concurrent.atomic.AtomicBoolean;
65 import java.util.concurrent.atomic.AtomicInteger;
66 import java.util.concurrent.atomic.AtomicReference;
67 import org.apache.commons.dbcp2.BasicDataSource;
68 import org.junit.After;
69 import org.junit.AfterClass;
70 import org.junit.Before;
71 import org.junit.BeforeClass;
72 import org.junit.Test;
73 import org.kie.api.runtime.KieSession;
74 import org.mockito.ArgumentCaptor;
75 import org.mockito.Mock;
76 import org.mockito.MockitoAnnotations;
77 import org.onap.policy.common.utils.services.OrderedServiceImpl;
78 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
79 import org.onap.policy.drools.core.PolicySession;
80 import org.onap.policy.drools.core.lock.Lock;
81 import org.onap.policy.drools.core.lock.LockCallback;
82 import org.onap.policy.drools.core.lock.LockState;
83 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
84 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
85 import org.onap.policy.drools.system.PolicyEngine;
86 import org.onap.policy.drools.system.PolicyEngineConstants;
87 import org.powermock.reflect.Whitebox;
88
89 public class DistributedLockManagerTest {
90     private static final long EXPIRE_SEC = 900L;
91     private static final long RETRY_SEC = 60L;
92     private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
93     private static final String OTHER_HOST = "other-host";
94     private static final String OTHER_OWNER = "other-owner";
95     private static final String EXPECTED_EXCEPTION = "expected exception";
96     private static final String DB_CONNECTION =
97                     "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
98     private static final String DB_USER = "user";
99     private static final String DB_PASSWORD = "password";
100     private static final String OWNER_KEY = "my key";
101     private static final String RESOURCE = "my resource";
102     private static final String RESOURCE2 = "my resource #2";
103     private static final String RESOURCE3 = "my resource #3";
104     private static final String RESOURCE4 = "my resource #4";
105     private static final String RESOURCE5 = "my resource #5";
106     private static final int HOLD_SEC = 100;
107     private static final int HOLD_SEC2 = 120;
108     private static final int MAX_THREADS = 5;
109     private static final int MAX_LOOPS = 100;
110     private static final boolean TRANSIENT = true;
111     private static final boolean PERMANENT = false;
112
113     // number of execute() calls before the first lock attempt
114     private static final int PRE_LOCK_EXECS = 1;
115
116     // number of execute() calls before the first schedule attempt
117     private static final int PRE_SCHED_EXECS = 1;
118
119     private static Connection conn = null;
120     private static ScheduledExecutorService saveExec;
121     private static ScheduledExecutorService realExec;
122
123     @Mock
124     private PolicyEngine engine;
125
126     @Mock
127     private KieSession kieSess;
128
129     @Mock
130     private ScheduledExecutorService exsvc;
131
132     @Mock
133     private ScheduledFuture<?> checker;
134
135     @Mock
136     private LockCallback callback;
137
138     @Mock
139     private BasicDataSource datasrc;
140
141     private DistributedLock lock;
142     private PolicySession session;
143
144     private AtomicInteger nactive;
145     private AtomicInteger nsuccesses;
146     private DistributedLockManager feature;
147
148
149     /**
150      * Configures the location of the property files and creates the DB.
151      *
152      * @throws SQLException if the DB cannot be created
153      */
154     @BeforeClass
155     public static void setUpBeforeClass() throws SQLException {
156         SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
157
158         conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
159
160         try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
161                         + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
162                         + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
163             createStmt.executeUpdate();
164         }
165
166         saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
167
168         realExec = Executors.newScheduledThreadPool(3);
169     }
170
171     /**
172      * Restores static fields.
173      */
174     @AfterClass
175     public static void tearDownAfterClass() throws SQLException {
176         Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
177         realExec.shutdown();
178         conn.close();
179     }
180
181     /**
182      * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
183      * tasks.
184      *
185      * @throws SQLException if the lock records cannot be deleted from the DB
186      */
187     @Before
188     public void setUp() throws SQLException {
189         MockitoAnnotations.initMocks(this);
190
191         // grant() and deny() calls will come through here and be immediately executed
192         session = new PolicySession(null, null, kieSess) {
193             @Override
194             public void insertDrools(Object object) {
195                 ((Runnable) object).run();
196             }
197         };
198
199         session.setPolicySession();
200
201         nactive = new AtomicInteger(0);
202         nsuccesses = new AtomicInteger(0);
203
204         cleanDb();
205
206         feature = new MyLockingFeature(true);
207     }
208
209     @After
210     public void tearDown() throws SQLException {
211         shutdownFeature();
212         cleanDb();
213     }
214
215     private void cleanDb() throws SQLException {
216         try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
217             stmt.executeUpdate();
218         }
219     }
220
221     private void shutdownFeature() {
222         if (feature != null) {
223             feature.afterStop(engine);
224             feature = null;
225         }
226     }
227
228     /**
229      * Tests that the feature is found in the expected service sets.
230      */
231     @Test
232     public void testServiceApis() {
233         assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
234                         .anyMatch(obj -> obj instanceof DistributedLockManager));
235     }
236
237     @Test
238     public void testGetSequenceNumber() {
239         assertEquals(1000, feature.getSequenceNumber());
240     }
241
242     @Test
243     public void testBeforeCreateLockManager() {
244         assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
245     }
246
247     /**
248      * Tests beforeCreate(), when getProperties() throws a runtime exception.
249      */
250     @Test
251     public void testBeforeCreateLockManagerEx() {
252         shutdownFeature();
253
254         feature = new MyLockingFeature(false) {
255             @Override
256             protected Properties getProperties(String fileName) {
257                 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
258             }
259         };
260
261         Properties props = new Properties();
262         assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, props))
263                         .isInstanceOf(DistributedLockManagerException.class);
264     }
265
266     @Test
267     public void testAfterStart() {
268         // verify that cleanup & expire check are both added to the queue
269         verify(exsvc).execute(any());
270         verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
271     }
272
273     /**
274      * Tests afterStart(), when thread pool throws a runtime exception.
275      */
276     @Test
277     public void testAfterStartExInThreadPool() {
278         shutdownFeature();
279
280         feature = new MyLockingFeature(false);
281
282         doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
283
284         assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
285     }
286
287     @Test
288     public void testDeleteExpiredDbLocks() throws SQLException {
289         // add records: two expired, one not
290         insertRecord(RESOURCE, feature.getUuidString(), -1);
291         insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
292         insertRecord(RESOURCE3, OTHER_OWNER, 0);
293         insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
294
295         // get the clean-up function and execute it
296         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
297         verify(exsvc).execute(captor.capture());
298
299         long tbegin = System.currentTimeMillis();
300         Runnable action = captor.getValue();
301         action.run();
302
303         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
304         assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
305         assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
306         assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
307
308         assertEquals(2, getRecordCount());
309     }
310
311     /**
312      * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
313      *
314      * @throws SQLException if an error occurs
315      */
316     @Test
317     public void testDeleteExpiredDbLocksEx() throws SQLException {
318         feature = new InvalidDbLockingFeature(TRANSIENT);
319
320         // get the clean-up function and execute it
321         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
322         verify(exsvc).execute(captor.capture());
323
324         Runnable action = captor.getValue();
325
326         // should not throw an exception
327         action.run();
328     }
329
330     @Test
331     public void testAfterStop() {
332         shutdownFeature();
333         verify(checker).cancel(anyBoolean());
334
335         feature = new DistributedLockManager();
336
337         // shutdown without calling afterStart()
338
339         shutdownFeature();
340     }
341
342     /**
343      * Tests afterStop(), when the data source throws an exception when close() is called.
344      *
345      * @throws SQLException if an error occurs
346      */
347     @Test
348     public void testAfterStopEx() throws SQLException {
349         shutdownFeature();
350
351         // use a data source that throws an exception when closed
352         feature = new InvalidDbLockingFeature(TRANSIENT);
353
354         assertThatCode(() -> shutdownFeature()).doesNotThrowAnyException();
355     }
356
357     @Test
358     public void testCreateLock() throws SQLException {
359         verify(exsvc).execute(any());
360
361         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
362         assertTrue(lock.isWaiting());
363
364         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
365
366         // this lock should fail
367         LockCallback callback2 = mock(LockCallback.class);
368         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
369         assertTrue(lock2.isUnavailable());
370         verify(callback2, never()).lockAvailable(lock2);
371         verify(callback2).lockUnavailable(lock2);
372
373         // this should fail, too
374         LockCallback callback3 = mock(LockCallback.class);
375         DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
376         assertTrue(lock3.isUnavailable());
377         verify(callback3, never()).lockAvailable(lock3);
378         verify(callback3).lockUnavailable(lock3);
379
380         // no change to first
381         assertTrue(lock.isWaiting());
382
383         // no callbacks to the first lock
384         verify(callback, never()).lockAvailable(lock);
385         verify(callback, never()).lockUnavailable(lock);
386
387         assertTrue(lock.isWaiting());
388         assertEquals(0, getRecordCount());
389
390         runLock(0, 0);
391         assertTrue(lock.isActive());
392         assertEquals(1, getRecordCount());
393
394         verify(callback).lockAvailable(lock);
395         verify(callback, never()).lockUnavailable(lock);
396
397         // this should succeed
398         DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
399         assertTrue(lock4.isWaiting());
400
401         // after running checker, original records should still remain
402         runChecker(0, 0, EXPIRE_SEC);
403         assertEquals(1, getRecordCount());
404         verify(callback, never()).lockUnavailable(lock);
405     }
406
407     /**
408      * Tests createLock() when the feature is not the latest instance.
409      */
410     @Test
411     public void testCreateLockNotLatestInstance() {
412         DistributedLockManager.setLatestInstance(null);
413
414         Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
415         assertTrue(lock.isUnavailable());
416         verify(callback, never()).lockAvailable(any());
417         verify(callback).lockUnavailable(lock);
418     }
419
420     @Test
421     public void testCheckExpired() throws SQLException {
422         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
423         runLock(0, 0);
424
425         LockCallback callback2 = mock(LockCallback.class);
426         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
427         runLock(1, 0);
428
429         LockCallback callback3 = mock(LockCallback.class);
430         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
431         runLock(2, 0);
432
433         LockCallback callback4 = mock(LockCallback.class);
434         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
435         runLock(3, 0);
436
437         LockCallback callback5 = mock(LockCallback.class);
438         final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
439         runLock(4, 0);
440
441         assertEquals(5, getRecordCount());
442
443         // expire one record
444         updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
445
446         // change host of another record
447         updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
448
449         // change uuid of another record
450         updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC);
451
452
453         // run the checker
454         runChecker(0, 0, EXPIRE_SEC);
455
456
457         // check lock states
458         assertTrue(lock.isUnavailable());
459         assertTrue(lock2.isActive());
460         assertTrue(lock3.isUnavailable());
461         assertTrue(lock4.isActive());
462         assertTrue(lock5.isUnavailable());
463
464         // allow callbacks
465         runLock(2, 2);
466         runLock(3, 1);
467         runLock(4, 0);
468         verify(callback).lockUnavailable(lock);
469         verify(callback3).lockUnavailable(lock3);
470         verify(callback5).lockUnavailable(lock5);
471
472         verify(callback2, never()).lockUnavailable(lock2);
473         verify(callback4, never()).lockUnavailable(lock4);
474
475
476         // another check should have been scheduled, with the normal interval
477         runChecker(1, 0, EXPIRE_SEC);
478     }
479
480     /**
481      * Tests checkExpired(), when schedule() throws an exception.
482      */
483     @Test
484     public void testCheckExpiredExecRejected() {
485         // arrange for execution to be rejected
486         when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
487                         .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
488
489         runChecker(0, 0, EXPIRE_SEC);
490     }
491
492     /**
493      * Tests checkExpired(), when getConnection() throws an exception.
494      */
495     @Test
496     public void testCheckExpiredSqlEx() {
497         // use a data source that throws an exception when getConnection() is called
498         feature = new InvalidDbLockingFeature(TRANSIENT);
499
500         runChecker(0, 0, EXPIRE_SEC);
501
502         // it should have scheduled another check, sooner
503         runChecker(0, 0, RETRY_SEC);
504     }
505
506     /**
507      * Tests checkExpired(), when getConnection() throws an exception and the feature is
508      * no longer alive.
509      */
510     @Test
511     public void testCheckExpiredSqlExFeatureStopped() {
512         // use a data source that throws an exception when getConnection() is called
513         feature = new InvalidDbLockingFeature(TRANSIENT) {
514             @Override
515             protected SQLException makeEx() {
516                 this.stop();
517                 return super.makeEx();
518             }
519         };
520
521         runChecker(0, 0, EXPIRE_SEC);
522
523         // it should NOT have scheduled another check
524         verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
525     }
526
527     @Test
528     public void testExpireLocks() throws SQLException {
529         AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
530
531         feature = new MyLockingFeature(true) {
532             @Override
533             protected BasicDataSource makeDataSource() throws Exception {
534                 // get the real data source
535                 BasicDataSource src2 = super.makeDataSource();
536
537                 when(datasrc.getConnection()).thenAnswer(answer -> {
538                     DistributedLock lck = freeLock.getAndSet(null);
539                     if (lck != null) {
540                         // free it
541                         lck.free();
542
543                         // run its doUnlock
544                         runLock(4, 0);
545                     }
546
547                     return src2.getConnection();
548                 });
549
550                 return datasrc;
551             }
552         };
553
554         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
555         runLock(0, 0);
556
557         LockCallback callback2 = mock(LockCallback.class);
558         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
559         runLock(1, 0);
560
561         LockCallback callback3 = mock(LockCallback.class);
562         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
563         // don't run doLock for lock3 - leave it in the waiting state
564
565         LockCallback callback4 = mock(LockCallback.class);
566         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
567         runLock(3, 0);
568
569         assertEquals(3, getRecordCount());
570
571         // expire one record
572         updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
573
574         // arrange to free lock4 while the checker is running
575         freeLock.set(lock4);
576
577         // run the checker
578         runChecker(0, 0, EXPIRE_SEC);
579
580
581         // check lock states
582         assertTrue(lock.isUnavailable());
583         assertTrue(lock2.isActive());
584         assertTrue(lock3.isWaiting());
585         assertTrue(lock4.isUnavailable());
586
587         runLock(4, 0);
588         verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
589
590         verify(callback).lockUnavailable(lock);
591         verify(callback2, never()).lockUnavailable(lock2);
592         verify(callback3, never()).lockUnavailable(lock3);
593         verify(callback4, never()).lockUnavailable(lock4);
594     }
595
596     @Test
597     public void testDistributedLockNoArgs() {
598         DistributedLock lock = new DistributedLock();
599         assertNull(lock.getResourceId());
600         assertNull(lock.getOwnerKey());
601         assertNull(lock.getCallback());
602         assertEquals(0, lock.getHoldSec());
603     }
604
605     @Test
606     public void testDistributedLock() {
607         assertThatIllegalArgumentException()
608                         .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
609                         .withMessageContaining("holdSec");
610
611         // should generate no exception
612         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
613     }
614
615     @Test
616     public void testDistributedLockSerializable() throws Exception {
617         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
618         lock = roundTrip(lock);
619
620         assertTrue(lock.isWaiting());
621
622         assertEquals(RESOURCE, lock.getResourceId());
623         assertEquals(OWNER_KEY, lock.getOwnerKey());
624         assertNull(lock.getCallback());
625         assertEquals(HOLD_SEC, lock.getHoldSec());
626     }
627
628     @Test
629     public void testGrant() {
630         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
631         assertFalse(lock.isActive());
632
633         // execute the doLock() call
634         runLock(0, 0);
635
636         assertTrue(lock.isActive());
637
638         // the callback for the lock should have been run in the foreground thread
639         verify(callback).lockAvailable(lock);
640     }
641
642     @Test
643     public void testDistributedLockDeny() {
644         // get a lock
645         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
646
647         // get another lock - should fail
648         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
649
650         assertTrue(lock.isUnavailable());
651
652         // the callback for the second lock should have been run in the foreground thread
653         verify(callback).lockUnavailable(lock);
654
655         // should only have a request for the first lock
656         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
657     }
658
659     @Test
660     public void testDistributedLockFree() {
661         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
662
663         assertTrue(lock.free());
664         assertTrue(lock.isUnavailable());
665
666         // run both requests associated with the lock
667         runLock(0, 1);
668         runLock(1, 0);
669
670         // should not have changed state
671         assertTrue(lock.isUnavailable());
672
673         // attempt to free it again
674         assertFalse(lock.free());
675
676         // should not have queued anything else
677         verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
678
679         // new lock should succeed
680         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
681         assertNotSame(lock2, lock);
682         assertTrue(lock2.isWaiting());
683     }
684
685     /**
686      * Tests that free() works on a serialized lock with a new feature.
687      *
688      * @throws Exception if an error occurs
689      */
690     @Test
691     public void testDistributedLockFreeSerialized() throws Exception {
692         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
693
694         feature = new MyLockingFeature(true);
695
696         lock = roundTrip(lock);
697         assertTrue(lock.free());
698         assertTrue(lock.isUnavailable());
699     }
700
701     /**
702      * Tests free() on a serialized lock without a feature.
703      *
704      * @throws Exception if an error occurs
705      */
706     @Test
707     public void testDistributedLockFreeNoFeature() throws Exception {
708         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
709
710         DistributedLockManager.setLatestInstance(null);
711
712         lock = roundTrip(lock);
713         assertFalse(lock.free());
714         assertTrue(lock.isUnavailable());
715     }
716
717     /**
718      * Tests the case where the lock is freed and doUnlock called between the call to
719      * isUnavailable() and the call to compute().
720      */
721     @Test
722     public void testDistributedLockFreeUnlocked() {
723         feature = new FreeWithFreeLockingFeature(true);
724
725         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
726
727         assertFalse(lock.free());
728         assertTrue(lock.isUnavailable());
729     }
730
731     /**
732      * Tests the case where the lock is freed, but doUnlock is not completed, between the
733      * call to isUnavailable() and the call to compute().
734      */
735     @Test
736     public void testDistributedLockFreeLockFreed() {
737         feature = new FreeWithFreeLockingFeature(false);
738
739         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
740
741         assertFalse(lock.free());
742         assertTrue(lock.isUnavailable());
743     }
744
745     @Test
746     public void testDistributedLockExtend() {
747         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
748
749         // lock2 should be denied - called back by this thread
750         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
751         verify(callback, never()).lockAvailable(lock2);
752         verify(callback).lockUnavailable(lock2);
753
754         // lock2 will still be denied - called back by this thread
755         lock2.extend(HOLD_SEC, callback);
756         verify(callback, times(2)).lockUnavailable(lock2);
757
758         // force lock2 to be active - should still be denied
759         Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
760         lock2.extend(HOLD_SEC, callback);
761         verify(callback, times(3)).lockUnavailable(lock2);
762
763         assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
764                         .withMessageContaining("holdSec");
765
766         // execute doLock()
767         runLock(0, 0);
768         assertTrue(lock.isActive());
769
770         // now extend the first lock
771         LockCallback callback2 = mock(LockCallback.class);
772         lock.extend(HOLD_SEC2, callback2);
773         assertTrue(lock.isWaiting());
774
775         // execute doExtend()
776         runLock(1, 0);
777         lock.extend(HOLD_SEC2, callback2);
778         assertEquals(HOLD_SEC2, lock.getHoldSec());
779         verify(callback2).lockAvailable(lock);
780         verify(callback2, never()).lockUnavailable(lock);
781     }
782
783     /**
784      * Tests that extend() works on a serialized lock with a new feature.
785      *
786      * @throws Exception if an error occurs
787      */
788     @Test
789     public void testDistributedLockExtendSerialized() throws Exception {
790         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
791
792         // run doLock
793         runLock(0, 0);
794         assertTrue(lock.isActive());
795
796         feature = new MyLockingFeature(true);
797
798         lock = roundTrip(lock);
799         assertTrue(lock.isActive());
800
801         LockCallback scallback = mock(LockCallback.class);
802
803         lock.extend(HOLD_SEC, scallback);
804         assertTrue(lock.isWaiting());
805
806         // run doExtend (in new feature)
807         runLock(0, 0);
808         assertTrue(lock.isActive());
809
810         verify(scallback).lockAvailable(lock);
811         verify(scallback, never()).lockUnavailable(lock);
812     }
813
814     /**
815      * Tests extend() on a serialized lock without a feature.
816      *
817      * @throws Exception if an error occurs
818      */
819     @Test
820     public void testDistributedLockExtendNoFeature() throws Exception {
821         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
822
823         // run doLock
824         runLock(0, 0);
825         assertTrue(lock.isActive());
826
827         DistributedLockManager.setLatestInstance(null);
828
829         lock = roundTrip(lock);
830         assertTrue(lock.isActive());
831
832         LockCallback scallback = mock(LockCallback.class);
833
834         lock.extend(HOLD_SEC, scallback);
835         assertTrue(lock.isUnavailable());
836
837         verify(scallback, never()).lockAvailable(lock);
838         verify(scallback).lockUnavailable(lock);
839     }
840
841     /**
842      * Tests the case where the lock is freed and doUnlock called between the call to
843      * isUnavailable() and the call to compute().
844      */
845     @Test
846     public void testDistributedLockExtendUnlocked() {
847         feature = new FreeWithFreeLockingFeature(true);
848
849         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
850
851         lock.extend(HOLD_SEC2, callback);
852         assertTrue(lock.isUnavailable());
853         verify(callback).lockUnavailable(lock);
854     }
855
856     /**
857      * Tests the case where the lock is freed, but doUnlock is not completed, between the
858      * call to isUnavailable() and the call to compute().
859      */
860     @Test
861     public void testDistributedLockExtendLockFreed() {
862         feature = new FreeWithFreeLockingFeature(false);
863
864         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
865
866         lock.extend(HOLD_SEC2, callback);
867         assertTrue(lock.isUnavailable());
868         verify(callback).lockUnavailable(lock);
869     }
870
871     @Test
872     public void testDistributedLockScheduleRequest() {
873         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
874         runLock(0, 0);
875
876         verify(callback).lockAvailable(lock);
877     }
878
879     @Test
880     public void testDistributedLockRescheduleRequest() throws SQLException {
881         // use a data source that throws an exception when getConnection() is called
882         InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
883         feature = invfeat;
884
885         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
886
887         // invoke doLock - should fail and reschedule
888         runLock(0, 0);
889
890         // should still be waiting
891         assertTrue(lock.isWaiting());
892         verify(callback, never()).lockUnavailable(lock);
893
894         // free the lock while doLock is executing
895         invfeat.freeLock = true;
896
897         // try scheduled request - should just invoke doUnlock
898         runSchedule(0, 0);
899
900         // should still be waiting
901         assertTrue(lock.isUnavailable());
902         verify(callback, never()).lockUnavailable(lock);
903
904         // should have scheduled a retry of doUnlock
905         verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
906     }
907
908     @Test
909     public void testDistributedLockGetNextRequest() {
910         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
911
912         /*
913          * run doLock. This should cause getNextRequest() to be called twice, once with a
914          * request in the queue, and the second time with request=null.
915          */
916         runLock(0, 0);
917     }
918
919     /**
920      * Tests getNextRequest(), where the same request is still in the queue the second
921      * time it's called.
922      */
923     @Test
924     public void testDistributedLockGetNextRequestSameRequest() {
925         // force reschedule to be invoked
926         feature = new InvalidDbLockingFeature(TRANSIENT);
927
928         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
929
930         /*
931          * run doLock. This should cause getNextRequest() to be called twice, once with a
932          * request in the queue, and the second time with the same request again.
933          */
934         runLock(0, 0);
935
936         verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
937     }
938
939     @Test
940     public void testDistributedLockDoRequest() throws SQLException {
941         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
942
943         assertTrue(lock.isWaiting());
944
945         // run doLock via doRequest
946         runLock(0, 0);
947
948         assertTrue(lock.isActive());
949     }
950
951     /**
952      * Tests doRequest(), when doRequest() is already running within another thread.
953      */
954     @Test
955     public void testDistributedLockDoRequestBusy() {
956         /*
957          * this feature will invoke a request in a background thread while it's being run
958          * in a foreground thread.
959          */
960         AtomicBoolean running = new AtomicBoolean(false);
961         AtomicBoolean returned = new AtomicBoolean(false);
962
963         feature = new MyLockingFeature(true) {
964             @Override
965             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
966                             LockCallback callback) {
967                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
968                     private static final long serialVersionUID = 1L;
969
970                     @Override
971                     protected boolean doDbInsert(Connection conn) throws SQLException {
972                         if (running.get()) {
973                             // already inside the thread - don't recurse any further
974                             return super.doDbInsert(conn);
975                         }
976
977                         running.set(true);
978
979                         Thread thread = new Thread(() -> {
980                             // run doLock from within the new thread
981                             runLock(0, 0);
982                         });
983                         thread.setDaemon(true);
984                         thread.start();
985
986                         // wait for the background thread to complete before continuing
987                         try {
988                             thread.join(5000);
989                         } catch (InterruptedException ignore) {
990                             Thread.currentThread().interrupt();
991                         }
992
993                         returned.set(!thread.isAlive());
994
995                         return super.doDbInsert(conn);
996                     }
997                 };
998             }
999         };
1000
1001         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1002
1003         // run doLock
1004         runLock(0, 0);
1005
1006         assertTrue(returned.get());
1007     }
1008
1009     /**
1010      * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1011      *
1012      * @throws SQLException if an error occurs
1013      */
1014     @Test
1015     public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1016         // throw run-time exception
1017         when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1018
1019         // use a data source that throws an exception when getConnection() is called
1020         feature = new MyLockingFeature(true) {
1021             @Override
1022             protected BasicDataSource makeDataSource() throws Exception {
1023                 return datasrc;
1024             }
1025         };
1026
1027         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1028
1029         // invoke doLock - should NOT reschedule
1030         runLock(0, 0);
1031
1032         assertTrue(lock.isUnavailable());
1033         verify(callback).lockUnavailable(lock);
1034
1035         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1036     }
1037
1038     /**
1039      * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1040      * state.
1041      *
1042      * @throws SQLException if an error occurs
1043      */
1044     @Test
1045     public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1046         // throw run-time exception
1047         when(datasrc.getConnection()).thenAnswer(answer -> {
1048             lock.free();
1049             throw new IllegalStateException(EXPECTED_EXCEPTION);
1050         });
1051
1052         // use a data source that throws an exception when getConnection() is called
1053         feature = new MyLockingFeature(true) {
1054             @Override
1055             protected BasicDataSource makeDataSource() throws Exception {
1056                 return datasrc;
1057             }
1058         };
1059
1060         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1061
1062         // invoke doLock - should NOT reschedule
1063         runLock(0, 0);
1064
1065         assertTrue(lock.isUnavailable());
1066         verify(callback, never()).lockUnavailable(lock);
1067
1068         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1069     }
1070
1071     /**
1072      * Tests doRequest() when the retry count gets exhausted.
1073      */
1074     @Test
1075     public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1076         // use a data source that throws an exception when getConnection() is called
1077         feature = new InvalidDbLockingFeature(TRANSIENT);
1078
1079         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1080
1081         // invoke doLock - should fail and reschedule
1082         runLock(0, 0);
1083
1084         // should still be waiting
1085         assertTrue(lock.isWaiting());
1086         verify(callback, never()).lockUnavailable(lock);
1087
1088         // try again, via SCHEDULER - first retry fails
1089         runSchedule(0, 0);
1090
1091         // should still be waiting
1092         assertTrue(lock.isWaiting());
1093         verify(callback, never()).lockUnavailable(lock);
1094
1095         // try again, via SCHEDULER - final retry fails
1096         runSchedule(1, 0);
1097         assertTrue(lock.isUnavailable());
1098
1099         // now callback should have been called
1100         verify(callback).lockUnavailable(lock);
1101     }
1102
1103     /**
1104      * Tests doRequest() when a non-transient DB exception is thrown.
1105      */
1106     @Test
1107     public void testDistributedLockDoRequestNotTransient() {
1108         /*
1109          * use a data source that throws a PERMANENT exception when getConnection() is
1110          * called
1111          */
1112         feature = new InvalidDbLockingFeature(PERMANENT);
1113
1114         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1115
1116         // invoke doLock - should fail
1117         runLock(0, 0);
1118
1119         assertTrue(lock.isUnavailable());
1120         verify(callback).lockUnavailable(lock);
1121
1122         // should not have scheduled anything new
1123         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1124         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1125     }
1126
1127     @Test
1128     public void testDistributedLockDoLock() throws SQLException {
1129         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1130
1131         // invoke doLock - should simply do an insert
1132         long tbegin = System.currentTimeMillis();
1133         runLock(0, 0);
1134
1135         assertEquals(1, getRecordCount());
1136         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1137         verify(callback).lockAvailable(lock);
1138     }
1139
1140     /**
1141      * Tests doLock() when the lock is freed before doLock runs.
1142      *
1143      * @throws SQLException if an error occurs
1144      */
1145     @Test
1146     public void testDistributedLockDoLockFreed() throws SQLException {
1147         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1148
1149         lock.setState(LockState.UNAVAILABLE);
1150
1151         // invoke doLock - should do nothing
1152         runLock(0, 0);
1153
1154         assertEquals(0, getRecordCount());
1155
1156         verify(callback, never()).lockAvailable(lock);
1157     }
1158
1159     /**
1160      * Tests doLock() when a DB exception is thrown.
1161      */
1162     @Test
1163     public void testDistributedLockDoLockEx() {
1164         // use a data source that throws an exception when getConnection() is called
1165         feature = new InvalidDbLockingFeature(PERMANENT);
1166
1167         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1168
1169         // invoke doLock - should simply do an insert
1170         runLock(0, 0);
1171
1172         // lock should have failed due to exception
1173         verify(callback).lockUnavailable(lock);
1174     }
1175
1176     /**
1177      * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1178      * to be called.
1179      */
1180     @Test
1181     public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1182         // insert an expired record
1183         insertRecord(RESOURCE, feature.getUuidString(), 0);
1184
1185         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1186
1187         // invoke doLock - should simply do an update
1188         runLock(0, 0);
1189         verify(callback).lockAvailable(lock);
1190     }
1191
1192     /**
1193      * Tests doLock() when a locked record already exists.
1194      */
1195     @Test
1196     public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1197         // insert an expired record
1198         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1199
1200         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1201
1202         // invoke doLock
1203         runLock(0, 0);
1204
1205         // lock should have failed because it's already locked
1206         verify(callback).lockUnavailable(lock);
1207     }
1208
1209     @Test
1210     public void testDistributedLockDoUnlock() throws SQLException {
1211         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1212
1213         // invoke doLock()
1214         runLock(0, 0);
1215
1216         lock.free();
1217
1218         // invoke doUnlock()
1219         long tbegin = System.currentTimeMillis();
1220         runLock(1, 0);
1221
1222         assertEquals(0, getRecordCount());
1223         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1224
1225         assertTrue(lock.isUnavailable());
1226
1227         // no more callbacks should have occurred
1228         verify(callback, times(1)).lockAvailable(lock);
1229         verify(callback, never()).lockUnavailable(lock);
1230     }
1231
1232     /**
1233      * Tests doUnlock() when a DB exception is thrown.
1234      *
1235      * @throws SQLException if an error occurs
1236      */
1237     @Test
1238     public void testDistributedLockDoUnlockEx() throws SQLException {
1239         feature = new InvalidDbLockingFeature(PERMANENT);
1240
1241         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1242
1243         // do NOT invoke doLock() - it will fail without a DB connection
1244
1245         lock.free();
1246
1247         // invoke doUnlock()
1248         runLock(1, 0);
1249
1250         assertTrue(lock.isUnavailable());
1251
1252         // no more callbacks should have occurred
1253         verify(callback, never()).lockAvailable(lock);
1254         verify(callback, never()).lockUnavailable(lock);
1255     }
1256
1257     @Test
1258     public void testDistributedLockDoExtend() throws SQLException {
1259         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1260         runLock(0, 0);
1261
1262         LockCallback callback2 = mock(LockCallback.class);
1263         lock.extend(HOLD_SEC2, callback2);
1264
1265         // call doExtend()
1266         long tbegin = System.currentTimeMillis();
1267         runLock(1, 0);
1268
1269         assertEquals(1, getRecordCount());
1270         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1271
1272         assertTrue(lock.isActive());
1273
1274         // no more callbacks should have occurred
1275         verify(callback).lockAvailable(lock);
1276         verify(callback, never()).lockUnavailable(lock);
1277
1278         // extension should have succeeded
1279         verify(callback2).lockAvailable(lock);
1280         verify(callback2, never()).lockUnavailable(lock);
1281     }
1282
1283     /**
1284      * Tests doExtend() when the lock is freed before doExtend runs.
1285      *
1286      * @throws SQLException if an error occurs
1287      */
1288     @Test
1289     public void testDistributedLockDoExtendFreed() throws SQLException {
1290         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1291         lock.extend(HOLD_SEC2, callback);
1292
1293         lock.setState(LockState.UNAVAILABLE);
1294
1295         // invoke doExtend - should do nothing
1296         runLock(1, 0);
1297
1298         assertEquals(0, getRecordCount());
1299
1300         verify(callback, never()).lockAvailable(lock);
1301     }
1302
1303     /**
1304      * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1305      * insert.
1306      *
1307      * @throws SQLException if an error occurs
1308      */
1309     @Test
1310     public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1311         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1312         runLock(0, 0);
1313
1314         LockCallback callback2 = mock(LockCallback.class);
1315         lock.extend(HOLD_SEC2, callback2);
1316
1317         // delete the record so it's forced to re-insert it
1318         cleanDb();
1319
1320         // call doExtend()
1321         long tbegin = System.currentTimeMillis();
1322         runLock(1, 0);
1323
1324         assertEquals(1, getRecordCount());
1325         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1326
1327         assertTrue(lock.isActive());
1328
1329         // no more callbacks should have occurred
1330         verify(callback).lockAvailable(lock);
1331         verify(callback, never()).lockUnavailable(lock);
1332
1333         // extension should have succeeded
1334         verify(callback2).lockAvailable(lock);
1335         verify(callback2, never()).lockUnavailable(lock);
1336     }
1337
1338     /**
1339      * Tests doExtend() when both update and insert fail.
1340      *
1341      * @throws SQLException if an error occurs
1342      */
1343     @Test
1344     public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
1345         /*
1346          * this feature will create a lock that returns false when doDbUpdate() is
1347          * invoked, or when doDbInsert() is invoked a second time
1348          */
1349         feature = new MyLockingFeature(true) {
1350             @Override
1351             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1352                             LockCallback callback) {
1353                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1354                     private static final long serialVersionUID = 1L;
1355                     private int ntimes = 0;
1356
1357                     @Override
1358                     protected boolean doDbInsert(Connection conn) throws SQLException {
1359                         if (ntimes++ > 0) {
1360                             return false;
1361                         }
1362
1363                         return super.doDbInsert(conn);
1364                     }
1365
1366                     @Override
1367                     protected boolean doDbUpdate(Connection conn) {
1368                         return false;
1369                     }
1370                 };
1371             }
1372         };
1373
1374         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1375         runLock(0, 0);
1376
1377         LockCallback callback2 = mock(LockCallback.class);
1378         lock.extend(HOLD_SEC2, callback2);
1379
1380         // call doExtend()
1381         runLock(1, 0);
1382
1383         assertTrue(lock.isUnavailable());
1384
1385         // no more callbacks should have occurred
1386         verify(callback).lockAvailable(lock);
1387         verify(callback, never()).lockUnavailable(lock);
1388
1389         // extension should have failed
1390         verify(callback2, never()).lockAvailable(lock);
1391         verify(callback2).lockUnavailable(lock);
1392     }
1393
1394     /**
1395      * Tests doExtend() when an exception occurs.
1396      *
1397      * @throws SQLException if an error occurs
1398      */
1399     @Test
1400     public void testDistributedLockDoExtendEx() throws SQLException {
1401         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1402         runLock(0, 0);
1403
1404         /*
1405          * delete the record and insert one with a different owner, which will cause
1406          * doDbInsert() to throw an exception
1407          */
1408         cleanDb();
1409         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1410
1411         LockCallback callback2 = mock(LockCallback.class);
1412         lock.extend(HOLD_SEC2, callback2);
1413
1414         // call doExtend()
1415         runLock(1, 0);
1416
1417         assertTrue(lock.isUnavailable());
1418
1419         // no more callbacks should have occurred
1420         verify(callback).lockAvailable(lock);
1421         verify(callback, never()).lockUnavailable(lock);
1422
1423         // extension should have failed
1424         verify(callback2, never()).lockAvailable(lock);
1425         verify(callback2).lockUnavailable(lock);
1426     }
1427
1428     @Test
1429     public void testDistributedLockToString() {
1430         String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
1431         assertNotNull(text);
1432         assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1433     }
1434
1435     @Test
1436     public void testMakeThreadPool() {
1437         // use a REAL feature to test this
1438         feature = new DistributedLockManager();
1439
1440         // this should create a thread pool
1441         feature.beforeCreateLockManager(engine, new Properties());
1442         feature.afterStart(engine);
1443
1444         assertThatCode(() -> shutdownFeature()).doesNotThrowAnyException();
1445     }
1446
1447     /**
1448      * Performs a multi-threaded test of the locking facility.
1449      *
1450      * @throws InterruptedException if the current thread is interrupted while waiting for
1451      *         the background threads to complete
1452      */
1453     @Test
1454     public void testMultiThreaded() throws InterruptedException {
1455         Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1456
1457         feature = new DistributedLockManager();
1458         feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1459         feature.afterStart(PolicyEngineConstants.getManager());
1460
1461         List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1462         for (int x = 0; x < MAX_THREADS; ++x) {
1463             threads.add(new MyThread());
1464         }
1465
1466         threads.forEach(Thread::start);
1467
1468         for (MyThread thread : threads) {
1469             thread.join(6000);
1470             assertFalse(thread.isAlive());
1471         }
1472
1473         for (MyThread thread : threads) {
1474             if (thread.err != null) {
1475                 throw thread.err;
1476             }
1477         }
1478
1479         assertTrue(nsuccesses.get() > 0);
1480     }
1481
1482     private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
1483                     boolean waitForLock) {
1484         return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
1485     }
1486
1487     private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1488         ByteArrayOutputStream baos = new ByteArrayOutputStream();
1489         try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1490             oos.writeObject(lock);
1491         }
1492
1493         ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1494         try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1495             return (DistributedLock) ois.readObject();
1496         }
1497     }
1498
1499     /**
1500      * Runs the checkExpired() action.
1501      *
1502      * @param nskip number of actions in the work queue to skip
1503      * @param nadditional number of additional actions that appear in the work queue
1504      *        <i>after</i> the checkExpired action
1505      * @param schedSec number of seconds for which the checker should have been scheduled
1506      */
1507     private void runChecker(int nskip, int nadditional, long schedSec) {
1508         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1509         verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1510         Runnable action = captor.getAllValues().get(nskip);
1511         action.run();
1512     }
1513
1514     /**
1515      * Runs a lock action (e.g., doLock, doUnlock).
1516      *
1517      * @param nskip number of actions in the work queue to skip
1518      * @param nadditional number of additional actions that appear in the work queue
1519      *        <i>after</i> the desired action
1520      */
1521     void runLock(int nskip, int nadditional) {
1522         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1523         verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1524
1525         Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1526         action.run();
1527     }
1528
1529     /**
1530      * Runs a scheduled action (e.g., "retry" action).
1531      *
1532      * @param nskip number of actions in the work queue to skip
1533      * @param nadditional number of additional actions that appear in the work queue
1534      *        <i>after</i> the desired action
1535      */
1536     void runSchedule(int nskip, int nadditional) {
1537         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1538         verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
1539
1540         Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1541         action.run();
1542     }
1543
1544     /**
1545      * Gets a count of the number of lock records in the DB.
1546      *
1547      * @return the number of lock records in the DB
1548      * @throws SQLException if an error occurs accessing the DB
1549      */
1550     private int getRecordCount() throws SQLException {
1551         try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1552                         ResultSet result = stmt.executeQuery()) {
1553
1554             if (result.next()) {
1555                 return result.getInt(1);
1556
1557             } else {
1558                 return 0;
1559             }
1560         }
1561     }
1562
1563     /**
1564      * Determines if there is a record for the given resource whose expiration time is in
1565      * the expected range.
1566      *
1567      * @param resourceId ID of the resource of interest
1568      * @param uuidString UUID string of the owner
1569      * @param holdSec seconds for which the lock was to be held
1570      * @param tbegin earliest time, in milliseconds, at which the record could have been
1571      *        inserted into the DB
1572      * @return {@code true} if a record is found, {@code false} otherwise
1573      * @throws SQLException if an error occurs accessing the DB
1574      */
1575     private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1576         try (PreparedStatement stmt =
1577                         conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1578                                         + " WHERE resourceId=? AND host=? AND owner=?")) {
1579
1580             stmt.setString(1, resourceId);
1581             stmt.setString(2, feature.getHostName());
1582             stmt.setString(3, uuidString);
1583
1584             try (ResultSet result = stmt.executeQuery()) {
1585                 if (result.next()) {
1586                     int remaining = result.getInt(1);
1587                     long maxDiff = System.currentTimeMillis() - tbegin;
1588                     return (remaining >= 0 && holdSec - remaining <= maxDiff);
1589
1590                 } else {
1591                     return false;
1592                 }
1593             }
1594         }
1595     }
1596
1597     /**
1598      * Inserts a record into the DB.
1599      *
1600      * @param resourceId ID of the resource of interest
1601      * @param uuidString UUID string of the owner
1602      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1603      * @throws SQLException if an error occurs accessing the DB
1604      */
1605     private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1606         this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset);
1607     }
1608
1609     private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1610                     throws SQLException {
1611         try (PreparedStatement stmt =
1612                         conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1613                                         + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1614
1615             stmt.setString(1, resourceId);
1616             stmt.setString(2, hostName);
1617             stmt.setString(3, uuidString);
1618             stmt.setInt(4, expireOffset);
1619
1620             assertEquals(1, stmt.executeUpdate());
1621         }
1622     }
1623
1624     /**
1625      * Updates a record in the DB.
1626      *
1627      * @param resourceId ID of the resource of interest
1628      * @param newUuid UUID string of the <i>new</i> owner
1629      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1630      * @throws SQLException if an error occurs accessing the DB
1631      */
1632     private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1633         try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1634                         + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1635
1636             stmt.setString(1, newHost);
1637             stmt.setString(2, newUuid);
1638             stmt.setInt(3, expireOffset);
1639             stmt.setString(4, resourceId);
1640
1641             assertEquals(1, stmt.executeUpdate());
1642         }
1643     }
1644
1645     /**
1646      * Feature that uses <i>exsvc</i> to execute requests.
1647      */
1648     private class MyLockingFeature extends DistributedLockManager {
1649
1650         public MyLockingFeature(boolean init) {
1651             shutdownFeature();
1652
1653             exsvc = mock(ScheduledExecutorService.class);
1654             when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1655             Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1656
1657             if (init) {
1658                 beforeCreateLockManager(engine, new Properties());
1659                 start();
1660                 afterStart(engine);
1661             }
1662         }
1663     }
1664
1665     /**
1666      * Feature whose data source all throws exceptions.
1667      */
1668     private class InvalidDbLockingFeature extends MyLockingFeature {
1669         private boolean isTransient;
1670         private boolean freeLock = false;
1671
1672         public InvalidDbLockingFeature(boolean isTransient) {
1673             // pass "false" because we have to set the error code BEFORE calling
1674             // afterStart()
1675             super(false);
1676
1677             this.isTransient = isTransient;
1678
1679             this.beforeCreateLockManager(engine, new Properties());
1680             this.start();
1681             this.afterStart(engine);
1682         }
1683
1684         @Override
1685         protected BasicDataSource makeDataSource() throws Exception {
1686             when(datasrc.getConnection()).thenAnswer(answer -> {
1687                 if (freeLock) {
1688                     freeLock = false;
1689                     lock.free();
1690                 }
1691
1692                 throw makeEx();
1693             });
1694
1695             doThrow(makeEx()).when(datasrc).close();
1696
1697             return datasrc;
1698         }
1699
1700         protected SQLException makeEx() {
1701             if (isTransient) {
1702                 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1703
1704             } else {
1705                 return new SQLException(EXPECTED_EXCEPTION);
1706             }
1707         }
1708     }
1709
1710     /**
1711      * Feature whose locks free themselves while free() is already running.
1712      */
1713     private class FreeWithFreeLockingFeature extends MyLockingFeature {
1714         private boolean relock;
1715
1716         public FreeWithFreeLockingFeature(boolean relock) {
1717             super(true);
1718             this.relock = relock;
1719         }
1720
1721         @Override
1722         protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1723                         LockCallback callback) {
1724
1725             return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1726                 private static final long serialVersionUID = 1L;
1727                 private boolean checked = false;
1728
1729                 @Override
1730                 public boolean isUnavailable() {
1731                     if (checked) {
1732                         return super.isUnavailable();
1733                     }
1734
1735                     checked = true;
1736
1737                     // release and relock
1738                     free();
1739
1740                     if (relock) {
1741                         // run doUnlock
1742                         runLock(1, 0);
1743
1744                         // relock it
1745                         createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1746                     }
1747
1748                     return false;
1749                 }
1750             };
1751         }
1752     }
1753
1754     /**
1755      * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
1756      * extend it, and then unlock it.
1757      */
1758     private class MyThread extends Thread {
1759         AssertionError err = null;
1760
1761         public MyThread() {
1762             setDaemon(true);
1763         }
1764
1765         @Override
1766         public void run() {
1767             try {
1768                 for (int x = 0; x < MAX_LOOPS; ++x) {
1769                     makeAttempt();
1770                 }
1771
1772             } catch (AssertionError e) {
1773                 err = e;
1774             }
1775         }
1776
1777         private void makeAttempt() {
1778             try {
1779                 Semaphore sem = new Semaphore(0);
1780
1781                 LockCallback cb = new LockCallback() {
1782                     @Override
1783                     public void lockAvailable(Lock lock) {
1784                         sem.release();
1785                     }
1786
1787                     @Override
1788                     public void lockUnavailable(Lock lock) {
1789                         sem.release();
1790                     }
1791                 };
1792
1793                 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1794
1795                 // wait for callback, whether available or unavailable
1796                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1797                 if (!lock.isActive()) {
1798                     return;
1799                 }
1800
1801                 nsuccesses.incrementAndGet();
1802
1803                 assertEquals(1, nactive.incrementAndGet());
1804
1805                 lock.extend(HOLD_SEC2, cb);
1806                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1807                 assertTrue(lock.isActive());
1808
1809                 // decrement BEFORE free()
1810                 nactive.decrementAndGet();
1811
1812                 assertTrue(lock.free());
1813                 assertTrue(lock.isUnavailable());
1814
1815             } catch (InterruptedException e) {
1816                 Thread.currentThread().interrupt();
1817                 throw new AssertionError("interrupted", e);
1818             }
1819         }
1820     }
1821 }