48152715c918d615dcc3db8fb42c5edbb662e52c
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.distributed.locking;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
26 import static org.assertj.core.api.Assertions.assertThatThrownBy;
27 import static org.junit.Assert.assertEquals;
28 import static org.junit.Assert.assertFalse;
29 import static org.junit.Assert.assertNotNull;
30 import static org.junit.Assert.assertNotSame;
31 import static org.junit.Assert.assertNull;
32 import static org.junit.Assert.assertSame;
33 import static org.junit.Assert.assertTrue;
34 import static org.mockito.ArgumentMatchers.any;
35 import static org.mockito.ArgumentMatchers.anyBoolean;
36 import static org.mockito.ArgumentMatchers.anyLong;
37 import static org.mockito.ArgumentMatchers.eq;
38 import static org.mockito.Mockito.doThrow;
39 import static org.mockito.Mockito.mock;
40 import static org.mockito.Mockito.never;
41 import static org.mockito.Mockito.times;
42 import static org.mockito.Mockito.verify;
43 import static org.mockito.Mockito.when;
44
45 import java.io.ByteArrayInputStream;
46 import java.io.ByteArrayOutputStream;
47 import java.io.ObjectInputStream;
48 import java.io.ObjectOutputStream;
49 import java.sql.Connection;
50 import java.sql.DriverManager;
51 import java.sql.PreparedStatement;
52 import java.sql.ResultSet;
53 import java.sql.SQLException;
54 import java.sql.SQLTransientException;
55 import java.util.ArrayList;
56 import java.util.List;
57 import java.util.Properties;
58 import java.util.concurrent.Executors;
59 import java.util.concurrent.RejectedExecutionException;
60 import java.util.concurrent.ScheduledExecutorService;
61 import java.util.concurrent.ScheduledFuture;
62 import java.util.concurrent.Semaphore;
63 import java.util.concurrent.TimeUnit;
64 import java.util.concurrent.atomic.AtomicBoolean;
65 import java.util.concurrent.atomic.AtomicInteger;
66 import java.util.concurrent.atomic.AtomicReference;
67 import org.apache.commons.dbcp2.BasicDataSource;
68 import org.junit.After;
69 import org.junit.AfterClass;
70 import org.junit.Before;
71 import org.junit.BeforeClass;
72 import org.junit.Test;
73 import org.kie.api.runtime.KieSession;
74 import org.mockito.ArgumentCaptor;
75 import org.mockito.Mock;
76 import org.mockito.MockitoAnnotations;
77 import org.onap.policy.common.utils.services.OrderedServiceImpl;
78 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
79 import org.onap.policy.drools.core.PolicySession;
80 import org.onap.policy.drools.core.lock.Lock;
81 import org.onap.policy.drools.core.lock.LockCallback;
82 import org.onap.policy.drools.core.lock.LockState;
83 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
84 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
85 import org.onap.policy.drools.system.PolicyEngine;
86 import org.onap.policy.drools.system.PolicyEngineConstants;
87 import org.powermock.reflect.Whitebox;
88
89 public class DistributedLockManagerTest {
90     private static final long EXPIRE_SEC = 900L;
91     private static final long RETRY_SEC = 60L;
92     private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
93     private static final String OTHER_HOST = "other-host";
94     private static final String OTHER_OWNER = "other-owner";
95     private static final String EXPECTED_EXCEPTION = "expected exception";
96     private static final String DB_CONNECTION =
97                     "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
98     private static final String DB_USER = "user";
99     private static final String DB_PASSWORD = "password";
100     private static final String OWNER_KEY = "my key";
101     private static final String RESOURCE = "my resource";
102     private static final String RESOURCE2 = "my resource #2";
103     private static final String RESOURCE3 = "my resource #3";
104     private static final String RESOURCE4 = "my resource #4";
105     private static final String RESOURCE5 = "my resource #5";
106     private static final int HOLD_SEC = 100;
107     private static final int HOLD_SEC2 = 120;
108     private static final int MAX_THREADS = 5;
109     private static final int MAX_LOOPS = 100;
110     private static final boolean TRANSIENT = true;
111     private static final boolean PERMANENT = false;
112
113     // number of execute() calls before the first lock attempt
114     private static final int PRE_LOCK_EXECS = 1;
115
116     // number of execute() calls before the first schedule attempt
117     private static final int PRE_SCHED_EXECS = 1;
118
119     private static Connection conn = null;
120     private static ScheduledExecutorService saveExec;
121     private static ScheduledExecutorService realExec;
122
123     @Mock
124     private PolicyEngine engine;
125
126     @Mock
127     private KieSession kieSess;
128
129     @Mock
130     private ScheduledExecutorService exsvc;
131
132     @Mock
133     private ScheduledFuture<?> checker;
134
135     @Mock
136     private LockCallback callback;
137
138     @Mock
139     private BasicDataSource datasrc;
140
141     private DistributedLock lock;
142     private PolicySession session;
143
144     private AtomicInteger nactive;
145     private AtomicInteger nsuccesses;
146     private DistributedLockManager feature;
147
148
149     /**
150      * Configures the location of the property files and creates the DB.
151      *
152      * @throws SQLException if the DB cannot be created
153      */
154     @BeforeClass
155     public static void setUpBeforeClass() throws SQLException {
156         SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
157
158         conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
159
160         try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
161                         + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
162                         + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
163             createStmt.executeUpdate();
164         }
165
166         saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
167
168         realExec = Executors.newScheduledThreadPool(3);
169     }
170
171     /**
172      * Restores static fields.
173      */
174     @AfterClass
175     public static void tearDownAfterClass() throws SQLException {
176         Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
177         realExec.shutdown();
178         conn.close();
179     }
180
181     /**
182      * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
183      * tasks.
184      *
185      * @throws SQLException if the lock records cannot be deleted from the DB
186      */
187     @Before
188     public void setUp() throws SQLException {
189         MockitoAnnotations.initMocks(this);
190
191         // grant() and deny() calls will come through here and be immediately executed
192         session = new PolicySession(null, null, kieSess) {
193             @Override
194             public void insertDrools(Object object) {
195                 ((Runnable) object).run();
196             }
197         };
198
199         session.setPolicySession();
200
201         nactive = new AtomicInteger(0);
202         nsuccesses = new AtomicInteger(0);
203
204         cleanDb();
205
206         feature = new MyLockingFeature(true);
207     }
208
209     @After
210     public void tearDown() throws SQLException {
211         shutdownFeature();
212         cleanDb();
213     }
214
215     private void cleanDb() throws SQLException {
216         try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
217             stmt.executeUpdate();
218         }
219     }
220
221     private void shutdownFeature() {
222         if (feature != null) {
223             feature.afterStop(engine);
224             feature = null;
225         }
226     }
227
228     /**
229      * Tests that the feature is found in the expected service sets.
230      */
231     @Test
232     public void testServiceApis() {
233         assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
234                         .anyMatch(obj -> obj instanceof DistributedLockManager));
235     }
236
237     @Test
238     public void testGetSequenceNumber() {
239         assertEquals(1000, feature.getSequenceNumber());
240     }
241
242     @Test
243     public void testBeforeCreateLockManager() {
244         assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
245     }
246
247     /**
248      * Tests beforeCreate(), when getProperties() throws a runtime exception.
249      */
250     @Test
251     public void testBeforeCreateLockManagerEx() {
252         shutdownFeature();
253
254         feature = new MyLockingFeature(false) {
255             @Override
256             protected Properties getProperties(String fileName) {
257                 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
258             }
259         };
260
261         assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties()))
262                         .isInstanceOf(DistributedLockManagerException.class);
263     }
264
265     @Test
266     public void testAfterStart() {
267         // verify that cleanup & expire check are both added to the queue
268         verify(exsvc).execute(any());
269         verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
270     }
271
272     /**
273      * Tests afterStart(), when thread pool throws a runtime exception.
274      */
275     @Test
276     public void testAfterStartExInThreadPool() {
277         shutdownFeature();
278
279         feature = new MyLockingFeature(false);
280
281         doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
282
283         assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
284     }
285
286     @Test
287     public void testDeleteExpiredDbLocks() throws SQLException {
288         // add records: two expired, one not
289         insertRecord(RESOURCE, feature.getUuidString(), -1);
290         insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
291         insertRecord(RESOURCE3, OTHER_OWNER, 0);
292         insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
293
294         // get the clean-up function and execute it
295         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
296         verify(exsvc).execute(captor.capture());
297
298         long tbegin = System.currentTimeMillis();
299         Runnable action = captor.getValue();
300         action.run();
301
302         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
303         assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
304         assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
305         assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
306
307         assertEquals(2, getRecordCount());
308     }
309
310     /**
311      * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
312      *
313      * @throws SQLException if an error occurs
314      */
315     @Test
316     public void testDeleteExpiredDbLocksEx() throws SQLException {
317         feature = new InvalidDbLockingFeature(TRANSIENT);
318
319         // get the clean-up function and execute it
320         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
321         verify(exsvc).execute(captor.capture());
322
323         Runnable action = captor.getValue();
324
325         // should not throw an exception
326         action.run();
327     }
328
329     @Test
330     public void testAfterStop() {
331         shutdownFeature();
332         verify(checker).cancel(anyBoolean());
333
334         feature = new DistributedLockManager();
335
336         // shutdown without calling afterStart()
337
338         shutdownFeature();
339     }
340
341     /**
342      * Tests afterStop(), when the data source throws an exception when close() is called.
343      *
344      * @throws SQLException if an error occurs
345      */
346     @Test
347     public void testAfterStopEx() throws SQLException {
348         shutdownFeature();
349
350         // use a data source that throws an exception when closed
351         feature = new InvalidDbLockingFeature(TRANSIENT);
352
353         assertThatCode(() -> shutdownFeature()).doesNotThrowAnyException();
354     }
355
356     @Test
357     public void testCreateLock() throws SQLException {
358         verify(exsvc).execute(any());
359
360         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
361         assertTrue(lock.isWaiting());
362
363         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
364
365         // this lock should fail
366         LockCallback callback2 = mock(LockCallback.class);
367         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
368         assertTrue(lock2.isUnavailable());
369         verify(callback2, never()).lockAvailable(lock2);
370         verify(callback2).lockUnavailable(lock2);
371
372         // this should fail, too
373         LockCallback callback3 = mock(LockCallback.class);
374         DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
375         assertTrue(lock3.isUnavailable());
376         verify(callback3, never()).lockAvailable(lock3);
377         verify(callback3).lockUnavailable(lock3);
378
379         // no change to first
380         assertTrue(lock.isWaiting());
381
382         // no callbacks to the first lock
383         verify(callback, never()).lockAvailable(lock);
384         verify(callback, never()).lockUnavailable(lock);
385
386         assertTrue(lock.isWaiting());
387         assertEquals(0, getRecordCount());
388
389         runLock(0, 0);
390         assertTrue(lock.isActive());
391         assertEquals(1, getRecordCount());
392
393         verify(callback).lockAvailable(lock);
394         verify(callback, never()).lockUnavailable(lock);
395
396         // this should succeed
397         DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
398         assertTrue(lock4.isWaiting());
399
400         // after running checker, original records should still remain
401         runChecker(0, 0, EXPIRE_SEC);
402         assertEquals(1, getRecordCount());
403         verify(callback, never()).lockUnavailable(lock);
404     }
405
406     /**
407      * Tests createLock() when the feature is not the latest instance.
408      */
409     @Test
410     public void testCreateLockNotLatestInstance() {
411         DistributedLockManager.setLatestInstance(null);
412
413         Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
414         assertTrue(lock.isUnavailable());
415         verify(callback, never()).lockAvailable(any());
416         verify(callback).lockUnavailable(lock);
417     }
418
419     @Test
420     public void testCheckExpired() throws SQLException {
421         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
422         runLock(0, 0);
423
424         LockCallback callback2 = mock(LockCallback.class);
425         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
426         runLock(1, 0);
427
428         LockCallback callback3 = mock(LockCallback.class);
429         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
430         runLock(2, 0);
431
432         LockCallback callback4 = mock(LockCallback.class);
433         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
434         runLock(3, 0);
435
436         LockCallback callback5 = mock(LockCallback.class);
437         final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
438         runLock(4, 0);
439
440         assertEquals(5, getRecordCount());
441
442         // expire one record
443         updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
444
445         // change host of another record
446         updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
447
448         // change uuid of another record
449         updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC);
450
451
452         // run the checker
453         runChecker(0, 0, EXPIRE_SEC);
454
455
456         // check lock states
457         assertTrue(lock.isUnavailable());
458         assertTrue(lock2.isActive());
459         assertTrue(lock3.isUnavailable());
460         assertTrue(lock4.isActive());
461         assertTrue(lock5.isUnavailable());
462
463         // allow callbacks
464         runLock(2, 2);
465         runLock(3, 1);
466         runLock(4, 0);
467         verify(callback).lockUnavailable(lock);
468         verify(callback3).lockUnavailable(lock3);
469         verify(callback5).lockUnavailable(lock5);
470
471         verify(callback2, never()).lockUnavailable(lock2);
472         verify(callback4, never()).lockUnavailable(lock4);
473
474
475         // another check should have been scheduled, with the normal interval
476         runChecker(1, 0, EXPIRE_SEC);
477     }
478
479     /**
480      * Tests checkExpired(), when schedule() throws an exception.
481      */
482     @Test
483     public void testCheckExpiredExecRejected() {
484         // arrange for execution to be rejected
485         when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
486                         .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
487
488         runChecker(0, 0, EXPIRE_SEC);
489     }
490
491     /**
492      * Tests checkExpired(), when getConnection() throws an exception.
493      */
494     @Test
495     public void testCheckExpiredSqlEx() {
496         // use a data source that throws an exception when getConnection() is called
497         feature = new InvalidDbLockingFeature(TRANSIENT);
498
499         runChecker(0, 0, EXPIRE_SEC);
500
501         // it should have scheduled another check, sooner
502         runChecker(0, 0, RETRY_SEC);
503     }
504
505     /**
506      * Tests checkExpired(), when getConnection() throws an exception and the feature is
507      * no longer alive.
508      */
509     @Test
510     public void testCheckExpiredSqlExFeatureStopped() {
511         // use a data source that throws an exception when getConnection() is called
512         feature = new InvalidDbLockingFeature(TRANSIENT) {
513             @Override
514             protected SQLException makeEx() {
515                 this.stop();
516                 return super.makeEx();
517             }
518         };
519
520         runChecker(0, 0, EXPIRE_SEC);
521
522         // it should NOT have scheduled another check
523         verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
524     }
525
526     @Test
527     public void testExpireLocks() throws SQLException {
528         AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
529
530         feature = new MyLockingFeature(true) {
531             @Override
532             protected BasicDataSource makeDataSource() throws Exception {
533                 // get the real data source
534                 BasicDataSource src2 = super.makeDataSource();
535
536                 when(datasrc.getConnection()).thenAnswer(answer -> {
537                     DistributedLock lck = freeLock.getAndSet(null);
538                     if (lck != null) {
539                         // free it
540                         lck.free();
541
542                         // run its doUnlock
543                         runLock(4, 0);
544                     }
545
546                     return src2.getConnection();
547                 });
548
549                 return datasrc;
550             }
551         };
552
553         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
554         runLock(0, 0);
555
556         LockCallback callback2 = mock(LockCallback.class);
557         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
558         runLock(1, 0);
559
560         LockCallback callback3 = mock(LockCallback.class);
561         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
562         // don't run doLock for lock3 - leave it in the waiting state
563
564         LockCallback callback4 = mock(LockCallback.class);
565         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
566         runLock(3, 0);
567
568         assertEquals(3, getRecordCount());
569
570         // expire one record
571         updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
572
573         // arrange to free lock4 while the checker is running
574         freeLock.set(lock4);
575
576         // run the checker
577         runChecker(0, 0, EXPIRE_SEC);
578
579
580         // check lock states
581         assertTrue(lock.isUnavailable());
582         assertTrue(lock2.isActive());
583         assertTrue(lock3.isWaiting());
584         assertTrue(lock4.isUnavailable());
585
586         runLock(4, 0);
587         verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
588
589         verify(callback).lockUnavailable(lock);
590         verify(callback2, never()).lockUnavailable(lock2);
591         verify(callback3, never()).lockUnavailable(lock3);
592         verify(callback4, never()).lockUnavailable(lock4);
593     }
594
595     @Test
596     public void testDistributedLockNoArgs() {
597         DistributedLock lock = new DistributedLock();
598         assertNull(lock.getResourceId());
599         assertNull(lock.getOwnerKey());
600         assertNull(lock.getCallback());
601         assertEquals(0, lock.getHoldSec());
602     }
603
604     @Test
605     public void testDistributedLock() {
606         assertThatIllegalArgumentException()
607                         .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
608                         .withMessageContaining("holdSec");
609
610         // should generate no exception
611         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
612     }
613
614     @Test
615     public void testDistributedLockSerializable() throws Exception {
616         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
617         lock = roundTrip(lock);
618
619         assertTrue(lock.isWaiting());
620
621         assertEquals(RESOURCE, lock.getResourceId());
622         assertEquals(OWNER_KEY, lock.getOwnerKey());
623         assertNull(lock.getCallback());
624         assertEquals(HOLD_SEC, lock.getHoldSec());
625     }
626
627     @Test
628     public void testGrant() {
629         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
630         assertFalse(lock.isActive());
631
632         // execute the doLock() call
633         runLock(0, 0);
634
635         assertTrue(lock.isActive());
636
637         // the callback for the lock should have been run in the foreground thread
638         verify(callback).lockAvailable(lock);
639     }
640
641     @Test
642     public void testDistributedLockDeny() {
643         // get a lock
644         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
645
646         // get another lock - should fail
647         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
648
649         assertTrue(lock.isUnavailable());
650
651         // the callback for the second lock should have been run in the foreground thread
652         verify(callback).lockUnavailable(lock);
653
654         // should only have a request for the first lock
655         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
656     }
657
658     @Test
659     public void testDistributedLockFree() {
660         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
661
662         assertTrue(lock.free());
663         assertTrue(lock.isUnavailable());
664
665         // run both requests associated with the lock
666         runLock(0, 1);
667         runLock(1, 0);
668
669         // should not have changed state
670         assertTrue(lock.isUnavailable());
671
672         // attempt to free it again
673         assertFalse(lock.free());
674
675         // should not have queued anything else
676         verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
677
678         // new lock should succeed
679         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
680         assertNotSame(lock2, lock);
681         assertTrue(lock2.isWaiting());
682     }
683
684     /**
685      * Tests that free() works on a serialized lock with a new feature.
686      *
687      * @throws Exception if an error occurs
688      */
689     @Test
690     public void testDistributedLockFreeSerialized() throws Exception {
691         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
692
693         feature = new MyLockingFeature(true);
694
695         lock = roundTrip(lock);
696         assertTrue(lock.free());
697         assertTrue(lock.isUnavailable());
698     }
699
700     /**
701      * Tests free() on a serialized lock without a feature.
702      *
703      * @throws Exception if an error occurs
704      */
705     @Test
706     public void testDistributedLockFreeNoFeature() throws Exception {
707         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
708
709         DistributedLockManager.setLatestInstance(null);
710
711         lock = roundTrip(lock);
712         assertFalse(lock.free());
713         assertTrue(lock.isUnavailable());
714     }
715
716     /**
717      * Tests the case where the lock is freed and doUnlock called between the call to
718      * isUnavailable() and the call to compute().
719      */
720     @Test
721     public void testDistributedLockFreeUnlocked() {
722         feature = new FreeWithFreeLockingFeature(true);
723
724         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
725
726         assertFalse(lock.free());
727         assertTrue(lock.isUnavailable());
728     }
729
730     /**
731      * Tests the case where the lock is freed, but doUnlock is not completed, between the
732      * call to isUnavailable() and the call to compute().
733      */
734     @Test
735     public void testDistributedLockFreeLockFreed() {
736         feature = new FreeWithFreeLockingFeature(false);
737
738         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
739
740         assertFalse(lock.free());
741         assertTrue(lock.isUnavailable());
742     }
743
744     @Test
745     public void testDistributedLockExtend() {
746         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
747
748         // lock2 should be denied - called back by this thread
749         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
750         verify(callback, never()).lockAvailable(lock2);
751         verify(callback).lockUnavailable(lock2);
752
753         // lock2 will still be denied - called back by this thread
754         lock2.extend(HOLD_SEC, callback);
755         verify(callback, times(2)).lockUnavailable(lock2);
756
757         // force lock2 to be active - should still be denied
758         Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
759         lock2.extend(HOLD_SEC, callback);
760         verify(callback, times(3)).lockUnavailable(lock2);
761
762         assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
763                         .withMessageContaining("holdSec");
764
765         // execute doLock()
766         runLock(0, 0);
767         assertTrue(lock.isActive());
768
769         // now extend the first lock
770         LockCallback callback2 = mock(LockCallback.class);
771         lock.extend(HOLD_SEC2, callback2);
772         assertTrue(lock.isWaiting());
773
774         // execute doExtend()
775         runLock(1, 0);
776         lock.extend(HOLD_SEC2, callback2);
777         assertEquals(HOLD_SEC2, lock.getHoldSec());
778         verify(callback2).lockAvailable(lock);
779         verify(callback2, never()).lockUnavailable(lock);
780     }
781
782     /**
783      * Tests that extend() works on a serialized lock with a new feature.
784      *
785      * @throws Exception if an error occurs
786      */
787     @Test
788     public void testDistributedLockExtendSerialized() throws Exception {
789         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
790
791         // run doLock
792         runLock(0, 0);
793         assertTrue(lock.isActive());
794
795         feature = new MyLockingFeature(true);
796
797         lock = roundTrip(lock);
798         assertTrue(lock.isActive());
799
800         LockCallback scallback = mock(LockCallback.class);
801
802         lock.extend(HOLD_SEC, scallback);
803         assertTrue(lock.isWaiting());
804
805         // run doExtend (in new feature)
806         runLock(0, 0);
807         assertTrue(lock.isActive());
808
809         verify(scallback).lockAvailable(lock);
810         verify(scallback, never()).lockUnavailable(lock);
811     }
812
813     /**
814      * Tests extend() on a serialized lock without a feature.
815      *
816      * @throws Exception if an error occurs
817      */
818     @Test
819     public void testDistributedLockExtendNoFeature() throws Exception {
820         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
821
822         // run doLock
823         runLock(0, 0);
824         assertTrue(lock.isActive());
825
826         DistributedLockManager.setLatestInstance(null);
827
828         lock = roundTrip(lock);
829         assertTrue(lock.isActive());
830
831         LockCallback scallback = mock(LockCallback.class);
832
833         lock.extend(HOLD_SEC, scallback);
834         assertTrue(lock.isUnavailable());
835
836         verify(scallback, never()).lockAvailable(lock);
837         verify(scallback).lockUnavailable(lock);
838     }
839
840     /**
841      * Tests the case where the lock is freed and doUnlock called between the call to
842      * isUnavailable() and the call to compute().
843      */
844     @Test
845     public void testDistributedLockExtendUnlocked() {
846         feature = new FreeWithFreeLockingFeature(true);
847
848         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
849
850         lock.extend(HOLD_SEC2, callback);
851         assertTrue(lock.isUnavailable());
852         verify(callback).lockUnavailable(lock);
853     }
854
855     /**
856      * Tests the case where the lock is freed, but doUnlock is not completed, between the
857      * call to isUnavailable() and the call to compute().
858      */
859     @Test
860     public void testDistributedLockExtendLockFreed() {
861         feature = new FreeWithFreeLockingFeature(false);
862
863         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
864
865         lock.extend(HOLD_SEC2, callback);
866         assertTrue(lock.isUnavailable());
867         verify(callback).lockUnavailable(lock);
868     }
869
870     @Test
871     public void testDistributedLockScheduleRequest() {
872         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
873         runLock(0, 0);
874
875         verify(callback).lockAvailable(lock);
876     }
877
878     @Test
879     public void testDistributedLockRescheduleRequest() throws SQLException {
880         // use a data source that throws an exception when getConnection() is called
881         InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
882         feature = invfeat;
883
884         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
885
886         // invoke doLock - should fail and reschedule
887         runLock(0, 0);
888
889         // should still be waiting
890         assertTrue(lock.isWaiting());
891         verify(callback, never()).lockUnavailable(lock);
892
893         // free the lock while doLock is executing
894         invfeat.freeLock = true;
895
896         // try scheduled request - should just invoke doUnlock
897         runSchedule(0, 0);
898
899         // should still be waiting
900         assertTrue(lock.isUnavailable());
901         verify(callback, never()).lockUnavailable(lock);
902
903         // should have scheduled a retry of doUnlock
904         verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
905     }
906
907     @Test
908     public void testDistributedLockGetNextRequest() {
909         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
910
911         /*
912          * run doLock. This should cause getNextRequest() to be called twice, once with a
913          * request in the queue, and the second time with request=null.
914          */
915         runLock(0, 0);
916     }
917
918     /**
919      * Tests getNextRequest(), where the same request is still in the queue the second
920      * time it's called.
921      */
922     @Test
923     public void testDistributedLockGetNextRequestSameRequest() {
924         // force reschedule to be invoked
925         feature = new InvalidDbLockingFeature(TRANSIENT);
926
927         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
928
929         /*
930          * run doLock. This should cause getNextRequest() to be called twice, once with a
931          * request in the queue, and the second time with the same request again.
932          */
933         runLock(0, 0);
934
935         verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
936     }
937
938     @Test
939     public void testDistributedLockDoRequest() throws SQLException {
940         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
941
942         assertTrue(lock.isWaiting());
943
944         // run doLock via doRequest
945         runLock(0, 0);
946
947         assertTrue(lock.isActive());
948     }
949
950     /**
951      * Tests doRequest(), when doRequest() is already running within another thread.
952      */
953     @Test
954     public void testDistributedLockDoRequestBusy() {
955         /*
956          * this feature will invoke a request in a background thread while it's being run
957          * in a foreground thread.
958          */
959         AtomicBoolean running = new AtomicBoolean(false);
960         AtomicBoolean returned = new AtomicBoolean(false);
961
962         feature = new MyLockingFeature(true) {
963             @Override
964             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
965                             LockCallback callback) {
966                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
967                     private static final long serialVersionUID = 1L;
968
969                     @Override
970                     protected boolean doDbInsert(Connection conn) throws SQLException {
971                         if (running.get()) {
972                             // already inside the thread - don't recurse any further
973                             return super.doDbInsert(conn);
974                         }
975
976                         running.set(true);
977
978                         Thread thread = new Thread(() -> {
979                             // run doLock from within the new thread
980                             runLock(0, 0);
981                         });
982                         thread.setDaemon(true);
983                         thread.start();
984
985                         // wait for the background thread to complete before continuing
986                         try {
987                             thread.join(5000);
988                         } catch (InterruptedException ignore) {
989                             Thread.currentThread().interrupt();
990                         }
991
992                         returned.set(!thread.isAlive());
993
994                         return super.doDbInsert(conn);
995                     }
996                 };
997             }
998         };
999
1000         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1001
1002         // run doLock
1003         runLock(0, 0);
1004
1005         assertTrue(returned.get());
1006     }
1007
1008     /**
1009      * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1010      *
1011      * @throws SQLException if an error occurs
1012      */
1013     @Test
1014     public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1015         // throw run-time exception
1016         when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1017
1018         // use a data source that throws an exception when getConnection() is called
1019         feature = new MyLockingFeature(true) {
1020             @Override
1021             protected BasicDataSource makeDataSource() throws Exception {
1022                 return datasrc;
1023             }
1024         };
1025
1026         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1027
1028         // invoke doLock - should NOT reschedule
1029         runLock(0, 0);
1030
1031         assertTrue(lock.isUnavailable());
1032         verify(callback).lockUnavailable(lock);
1033
1034         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1035     }
1036
1037     /**
1038      * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1039      * state.
1040      *
1041      * @throws SQLException if an error occurs
1042      */
1043     @Test
1044     public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1045         // throw run-time exception
1046         when(datasrc.getConnection()).thenAnswer(answer -> {
1047             lock.free();
1048             throw new IllegalStateException(EXPECTED_EXCEPTION);
1049         });
1050
1051         // use a data source that throws an exception when getConnection() is called
1052         feature = new MyLockingFeature(true) {
1053             @Override
1054             protected BasicDataSource makeDataSource() throws Exception {
1055                 return datasrc;
1056             }
1057         };
1058
1059         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1060
1061         // invoke doLock - should NOT reschedule
1062         runLock(0, 0);
1063
1064         assertTrue(lock.isUnavailable());
1065         verify(callback, never()).lockUnavailable(lock);
1066
1067         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1068     }
1069
1070     /**
1071      * Tests doRequest() when the retry count gets exhausted.
1072      */
1073     @Test
1074     public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1075         // use a data source that throws an exception when getConnection() is called
1076         feature = new InvalidDbLockingFeature(TRANSIENT);
1077
1078         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1079
1080         // invoke doLock - should fail and reschedule
1081         runLock(0, 0);
1082
1083         // should still be waiting
1084         assertTrue(lock.isWaiting());
1085         verify(callback, never()).lockUnavailable(lock);
1086
1087         // try again, via SCHEDULER - first retry fails
1088         runSchedule(0, 0);
1089
1090         // should still be waiting
1091         assertTrue(lock.isWaiting());
1092         verify(callback, never()).lockUnavailable(lock);
1093
1094         // try again, via SCHEDULER - final retry fails
1095         runSchedule(1, 0);
1096         assertTrue(lock.isUnavailable());
1097
1098         // now callback should have been called
1099         verify(callback).lockUnavailable(lock);
1100     }
1101
1102     /**
1103      * Tests doRequest() when a non-transient DB exception is thrown.
1104      */
1105     @Test
1106     public void testDistributedLockDoRequestNotTransient() {
1107         /*
1108          * use a data source that throws a PERMANENT exception when getConnection() is
1109          * called
1110          */
1111         feature = new InvalidDbLockingFeature(PERMANENT);
1112
1113         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1114
1115         // invoke doLock - should fail
1116         runLock(0, 0);
1117
1118         assertTrue(lock.isUnavailable());
1119         verify(callback).lockUnavailable(lock);
1120
1121         // should not have scheduled anything new
1122         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1123         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1124     }
1125
1126     @Test
1127     public void testDistributedLockDoLock() throws SQLException {
1128         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1129
1130         // invoke doLock - should simply do an insert
1131         long tbegin = System.currentTimeMillis();
1132         runLock(0, 0);
1133
1134         assertEquals(1, getRecordCount());
1135         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1136         verify(callback).lockAvailable(lock);
1137     }
1138
1139     /**
1140      * Tests doLock() when the lock is freed before doLock runs.
1141      *
1142      * @throws SQLException if an error occurs
1143      */
1144     @Test
1145     public void testDistributedLockDoLockFreed() throws SQLException {
1146         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1147
1148         lock.setState(LockState.UNAVAILABLE);
1149
1150         // invoke doLock - should do nothing
1151         runLock(0, 0);
1152
1153         assertEquals(0, getRecordCount());
1154
1155         verify(callback, never()).lockAvailable(lock);
1156     }
1157
1158     /**
1159      * Tests doLock() when a DB exception is thrown.
1160      */
1161     @Test
1162     public void testDistributedLockDoLockEx() {
1163         // use a data source that throws an exception when getConnection() is called
1164         feature = new InvalidDbLockingFeature(PERMANENT);
1165
1166         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1167
1168         // invoke doLock - should simply do an insert
1169         runLock(0, 0);
1170
1171         // lock should have failed due to exception
1172         verify(callback).lockUnavailable(lock);
1173     }
1174
1175     /**
1176      * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1177      * to be called.
1178      */
1179     @Test
1180     public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1181         // insert an expired record
1182         insertRecord(RESOURCE, feature.getUuidString(), 0);
1183
1184         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1185
1186         // invoke doLock - should simply do an update
1187         runLock(0, 0);
1188         verify(callback).lockAvailable(lock);
1189     }
1190
1191     /**
1192      * Tests doLock() when a locked record already exists.
1193      */
1194     @Test
1195     public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1196         // insert an expired record
1197         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1198
1199         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1200
1201         // invoke doLock
1202         runLock(0, 0);
1203
1204         // lock should have failed because it's already locked
1205         verify(callback).lockUnavailable(lock);
1206     }
1207
1208     @Test
1209     public void testDistributedLockDoUnlock() throws SQLException {
1210         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1211
1212         // invoke doLock()
1213         runLock(0, 0);
1214
1215         lock.free();
1216
1217         // invoke doUnlock()
1218         long tbegin = System.currentTimeMillis();
1219         runLock(1, 0);
1220
1221         assertEquals(0, getRecordCount());
1222         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1223
1224         assertTrue(lock.isUnavailable());
1225
1226         // no more callbacks should have occurred
1227         verify(callback, times(1)).lockAvailable(lock);
1228         verify(callback, never()).lockUnavailable(lock);
1229     }
1230
1231     /**
1232      * Tests doUnlock() when a DB exception is thrown.
1233      *
1234      * @throws SQLException if an error occurs
1235      */
1236     @Test
1237     public void testDistributedLockDoUnlockEx() throws SQLException {
1238         feature = new InvalidDbLockingFeature(PERMANENT);
1239
1240         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1241
1242         // do NOT invoke doLock() - it will fail without a DB connection
1243
1244         lock.free();
1245
1246         // invoke doUnlock()
1247         runLock(1, 0);
1248
1249         assertTrue(lock.isUnavailable());
1250
1251         // no more callbacks should have occurred
1252         verify(callback, never()).lockAvailable(lock);
1253         verify(callback, never()).lockUnavailable(lock);
1254     }
1255
1256     @Test
1257     public void testDistributedLockDoExtend() throws SQLException {
1258         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1259         runLock(0, 0);
1260
1261         LockCallback callback2 = mock(LockCallback.class);
1262         lock.extend(HOLD_SEC2, callback2);
1263
1264         // call doExtend()
1265         long tbegin = System.currentTimeMillis();
1266         runLock(1, 0);
1267
1268         assertEquals(1, getRecordCount());
1269         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1270
1271         assertTrue(lock.isActive());
1272
1273         // no more callbacks should have occurred
1274         verify(callback).lockAvailable(lock);
1275         verify(callback, never()).lockUnavailable(lock);
1276
1277         // extension should have succeeded
1278         verify(callback2).lockAvailable(lock);
1279         verify(callback2, never()).lockUnavailable(lock);
1280     }
1281
1282     /**
1283      * Tests doExtend() when the lock is freed before doExtend runs.
1284      *
1285      * @throws SQLException if an error occurs
1286      */
1287     @Test
1288     public void testDistributedLockDoExtendFreed() throws SQLException {
1289         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1290         lock.extend(HOLD_SEC2, callback);
1291
1292         lock.setState(LockState.UNAVAILABLE);
1293
1294         // invoke doExtend - should do nothing
1295         runLock(1, 0);
1296
1297         assertEquals(0, getRecordCount());
1298
1299         verify(callback, never()).lockAvailable(lock);
1300     }
1301
1302     /**
1303      * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1304      * insert.
1305      *
1306      * @throws SQLException if an error occurs
1307      */
1308     @Test
1309     public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1310         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1311         runLock(0, 0);
1312
1313         LockCallback callback2 = mock(LockCallback.class);
1314         lock.extend(HOLD_SEC2, callback2);
1315
1316         // delete the record so it's forced to re-insert it
1317         cleanDb();
1318
1319         // call doExtend()
1320         long tbegin = System.currentTimeMillis();
1321         runLock(1, 0);
1322
1323         assertEquals(1, getRecordCount());
1324         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1325
1326         assertTrue(lock.isActive());
1327
1328         // no more callbacks should have occurred
1329         verify(callback).lockAvailable(lock);
1330         verify(callback, never()).lockUnavailable(lock);
1331
1332         // extension should have succeeded
1333         verify(callback2).lockAvailable(lock);
1334         verify(callback2, never()).lockUnavailable(lock);
1335     }
1336
1337     /**
1338      * Tests doExtend() when both update and insert fail.
1339      *
1340      * @throws SQLException if an error occurs
1341      */
1342     @Test
1343     public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
1344         /*
1345          * this feature will create a lock that returns false when doDbUpdate() is
1346          * invoked, or when doDbInsert() is invoked a second time
1347          */
1348         feature = new MyLockingFeature(true) {
1349             @Override
1350             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1351                             LockCallback callback) {
1352                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1353                     private static final long serialVersionUID = 1L;
1354                     private int ntimes = 0;
1355
1356                     @Override
1357                     protected boolean doDbInsert(Connection conn) throws SQLException {
1358                         if (ntimes++ > 0) {
1359                             return false;
1360                         }
1361
1362                         return super.doDbInsert(conn);
1363                     }
1364
1365                     @Override
1366                     protected boolean doDbUpdate(Connection conn) {
1367                         return false;
1368                     }
1369                 };
1370             }
1371         };
1372
1373         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1374         runLock(0, 0);
1375
1376         LockCallback callback2 = mock(LockCallback.class);
1377         lock.extend(HOLD_SEC2, callback2);
1378
1379         // call doExtend()
1380         runLock(1, 0);
1381
1382         assertTrue(lock.isUnavailable());
1383
1384         // no more callbacks should have occurred
1385         verify(callback).lockAvailable(lock);
1386         verify(callback, never()).lockUnavailable(lock);
1387
1388         // extension should have failed
1389         verify(callback2, never()).lockAvailable(lock);
1390         verify(callback2).lockUnavailable(lock);
1391     }
1392
1393     /**
1394      * Tests doExtend() when an exception occurs.
1395      *
1396      * @throws SQLException if an error occurs
1397      */
1398     @Test
1399     public void testDistributedLockDoExtendEx() throws SQLException {
1400         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1401         runLock(0, 0);
1402
1403         /*
1404          * delete the record and insert one with a different owner, which will cause
1405          * doDbInsert() to throw an exception
1406          */
1407         cleanDb();
1408         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1409
1410         LockCallback callback2 = mock(LockCallback.class);
1411         lock.extend(HOLD_SEC2, callback2);
1412
1413         // call doExtend()
1414         runLock(1, 0);
1415
1416         assertTrue(lock.isUnavailable());
1417
1418         // no more callbacks should have occurred
1419         verify(callback).lockAvailable(lock);
1420         verify(callback, never()).lockUnavailable(lock);
1421
1422         // extension should have failed
1423         verify(callback2, never()).lockAvailable(lock);
1424         verify(callback2).lockUnavailable(lock);
1425     }
1426
1427     @Test
1428     public void testDistributedLockToString() {
1429         String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
1430         assertNotNull(text);
1431         assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1432     }
1433
1434     @Test
1435     public void testMakeThreadPool() {
1436         // use a REAL feature to test this
1437         feature = new DistributedLockManager();
1438
1439         // this should create a thread pool
1440         feature.beforeCreateLockManager(engine, new Properties());
1441         feature.afterStart(engine);
1442
1443         assertThatCode(() -> shutdownFeature()).doesNotThrowAnyException();
1444     }
1445
1446     /**
1447      * Performs a multi-threaded test of the locking facility.
1448      *
1449      * @throws InterruptedException if the current thread is interrupted while waiting for
1450      *         the background threads to complete
1451      */
1452     @Test
1453     public void testMultiThreaded() throws InterruptedException {
1454         Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1455
1456         feature = new DistributedLockManager();
1457         feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1458         feature.afterStart(PolicyEngineConstants.getManager());
1459
1460         List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1461         for (int x = 0; x < MAX_THREADS; ++x) {
1462             threads.add(new MyThread());
1463         }
1464
1465         threads.forEach(Thread::start);
1466
1467         for (MyThread thread : threads) {
1468             thread.join(6000);
1469             assertFalse(thread.isAlive());
1470         }
1471
1472         for (MyThread thread : threads) {
1473             if (thread.err != null) {
1474                 throw thread.err;
1475             }
1476         }
1477
1478         assertTrue(nsuccesses.get() > 0);
1479     }
1480
1481     private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
1482                     boolean waitForLock) {
1483         return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
1484     }
1485
1486     private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1487         ByteArrayOutputStream baos = new ByteArrayOutputStream();
1488         try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1489             oos.writeObject(lock);
1490         }
1491
1492         ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1493         try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1494             return (DistributedLock) ois.readObject();
1495         }
1496     }
1497
1498     /**
1499      * Runs the checkExpired() action.
1500      *
1501      * @param nskip number of actions in the work queue to skip
1502      * @param nadditional number of additional actions that appear in the work queue
1503      *        <i>after</i> the checkExpired action
1504      * @param schedSec number of seconds for which the checker should have been scheduled
1505      */
1506     private void runChecker(int nskip, int nadditional, long schedSec) {
1507         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1508         verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1509         Runnable action = captor.getAllValues().get(nskip);
1510         action.run();
1511     }
1512
1513     /**
1514      * Runs a lock action (e.g., doLock, doUnlock).
1515      *
1516      * @param nskip number of actions in the work queue to skip
1517      * @param nadditional number of additional actions that appear in the work queue
1518      *        <i>after</i> the desired action
1519      */
1520     void runLock(int nskip, int nadditional) {
1521         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1522         verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1523
1524         Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1525         action.run();
1526     }
1527
1528     /**
1529      * Runs a scheduled action (e.g., "retry" action).
1530      *
1531      * @param nskip number of actions in the work queue to skip
1532      * @param nadditional number of additional actions that appear in the work queue
1533      *        <i>after</i> the desired action
1534      */
1535     void runSchedule(int nskip, int nadditional) {
1536         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1537         verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
1538
1539         Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1540         action.run();
1541     }
1542
1543     /**
1544      * Gets a count of the number of lock records in the DB.
1545      *
1546      * @return the number of lock records in the DB
1547      * @throws SQLException if an error occurs accessing the DB
1548      */
1549     private int getRecordCount() throws SQLException {
1550         try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1551                         ResultSet result = stmt.executeQuery()) {
1552
1553             if (result.next()) {
1554                 return result.getInt(1);
1555
1556             } else {
1557                 return 0;
1558             }
1559         }
1560     }
1561
1562     /**
1563      * Determines if there is a record for the given resource whose expiration time is in
1564      * the expected range.
1565      *
1566      * @param resourceId ID of the resource of interest
1567      * @param uuidString UUID string of the owner
1568      * @param holdSec seconds for which the lock was to be held
1569      * @param tbegin earliest time, in milliseconds, at which the record could have been
1570      *        inserted into the DB
1571      * @return {@code true} if a record is found, {@code false} otherwise
1572      * @throws SQLException if an error occurs accessing the DB
1573      */
1574     private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1575         try (PreparedStatement stmt =
1576                         conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1577                                         + " WHERE resourceId=? AND host=? AND owner=?")) {
1578
1579             stmt.setString(1, resourceId);
1580             stmt.setString(2, feature.getHostName());
1581             stmt.setString(3, uuidString);
1582
1583             try (ResultSet result = stmt.executeQuery()) {
1584                 if (result.next()) {
1585                     int remaining = result.getInt(1);
1586                     long maxDiff = System.currentTimeMillis() - tbegin;
1587                     return (remaining >= 0 && holdSec - remaining <= maxDiff);
1588
1589                 } else {
1590                     return false;
1591                 }
1592             }
1593         }
1594     }
1595
1596     /**
1597      * Inserts a record into the DB.
1598      *
1599      * @param resourceId ID of the resource of interest
1600      * @param uuidString UUID string of the owner
1601      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1602      * @throws SQLException if an error occurs accessing the DB
1603      */
1604     private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1605         this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset);
1606     }
1607
1608     private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1609                     throws SQLException {
1610         try (PreparedStatement stmt =
1611                         conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1612                                         + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1613
1614             stmt.setString(1, resourceId);
1615             stmt.setString(2, hostName);
1616             stmt.setString(3, uuidString);
1617             stmt.setInt(4, expireOffset);
1618
1619             assertEquals(1, stmt.executeUpdate());
1620         }
1621     }
1622
1623     /**
1624      * Updates a record in the DB.
1625      *
1626      * @param resourceId ID of the resource of interest
1627      * @param newUuid UUID string of the <i>new</i> owner
1628      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1629      * @throws SQLException if an error occurs accessing the DB
1630      */
1631     private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1632         try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1633                         + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1634
1635             stmt.setString(1, newHost);
1636             stmt.setString(2, newUuid);
1637             stmt.setInt(3, expireOffset);
1638             stmt.setString(4, resourceId);
1639
1640             assertEquals(1, stmt.executeUpdate());
1641         }
1642     }
1643
1644     /**
1645      * Feature that uses <i>exsvc</i> to execute requests.
1646      */
1647     private class MyLockingFeature extends DistributedLockManager {
1648
1649         public MyLockingFeature(boolean init) {
1650             shutdownFeature();
1651
1652             exsvc = mock(ScheduledExecutorService.class);
1653             when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1654             Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1655
1656             if (init) {
1657                 beforeCreateLockManager(engine, new Properties());
1658                 start();
1659                 afterStart(engine);
1660             }
1661         }
1662     }
1663
1664     /**
1665      * Feature whose data source all throws exceptions.
1666      */
1667     private class InvalidDbLockingFeature extends MyLockingFeature {
1668         private boolean isTransient;
1669         private boolean freeLock = false;
1670
1671         public InvalidDbLockingFeature(boolean isTransient) {
1672             // pass "false" because we have to set the error code BEFORE calling
1673             // afterStart()
1674             super(false);
1675
1676             this.isTransient = isTransient;
1677
1678             this.beforeCreateLockManager(engine, new Properties());
1679             this.start();
1680             this.afterStart(engine);
1681         }
1682
1683         @Override
1684         protected BasicDataSource makeDataSource() throws Exception {
1685             when(datasrc.getConnection()).thenAnswer(answer -> {
1686                 if (freeLock) {
1687                     freeLock = false;
1688                     lock.free();
1689                 }
1690
1691                 throw makeEx();
1692             });
1693
1694             doThrow(makeEx()).when(datasrc).close();
1695
1696             return datasrc;
1697         }
1698
1699         protected SQLException makeEx() {
1700             if (isTransient) {
1701                 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1702
1703             } else {
1704                 return new SQLException(EXPECTED_EXCEPTION);
1705             }
1706         }
1707     }
1708
1709     /**
1710      * Feature whose locks free themselves while free() is already running.
1711      */
1712     private class FreeWithFreeLockingFeature extends MyLockingFeature {
1713         private boolean relock;
1714
1715         public FreeWithFreeLockingFeature(boolean relock) {
1716             super(true);
1717             this.relock = relock;
1718         }
1719
1720         @Override
1721         protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1722                         LockCallback callback) {
1723
1724             return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1725                 private static final long serialVersionUID = 1L;
1726                 private boolean checked = false;
1727
1728                 @Override
1729                 public boolean isUnavailable() {
1730                     if (checked) {
1731                         return super.isUnavailable();
1732                     }
1733
1734                     checked = true;
1735
1736                     // release and relock
1737                     free();
1738
1739                     if (relock) {
1740                         // run doUnlock
1741                         runLock(1, 0);
1742
1743                         // relock it
1744                         createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1745                     }
1746
1747                     return false;
1748                 }
1749             };
1750         }
1751     }
1752
1753     /**
1754      * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
1755      * extend it, and then unlock it.
1756      */
1757     private class MyThread extends Thread {
1758         AssertionError err = null;
1759
1760         public MyThread() {
1761             setDaemon(true);
1762         }
1763
1764         @Override
1765         public void run() {
1766             try {
1767                 for (int x = 0; x < MAX_LOOPS; ++x) {
1768                     makeAttempt();
1769                 }
1770
1771             } catch (AssertionError e) {
1772                 err = e;
1773             }
1774         }
1775
1776         private void makeAttempt() {
1777             try {
1778                 Semaphore sem = new Semaphore(0);
1779
1780                 LockCallback cb = new LockCallback() {
1781                     @Override
1782                     public void lockAvailable(Lock lock) {
1783                         sem.release();
1784                     }
1785
1786                     @Override
1787                     public void lockUnavailable(Lock lock) {
1788                         sem.release();
1789                     }
1790                 };
1791
1792                 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1793
1794                 // wait for callback, whether available or unavailable
1795                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1796                 if (!lock.isActive()) {
1797                     return;
1798                 }
1799
1800                 nsuccesses.incrementAndGet();
1801
1802                 assertEquals(1, nactive.incrementAndGet());
1803
1804                 lock.extend(HOLD_SEC2, cb);
1805                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1806                 assertTrue(lock.isActive());
1807
1808                 // decrement BEFORE free()
1809                 nactive.decrementAndGet();
1810
1811                 assertTrue(lock.free());
1812                 assertTrue(lock.isUnavailable());
1813
1814             } catch (InterruptedException e) {
1815                 Thread.currentThread().interrupt();
1816                 throw new AssertionError("interrupted", e);
1817             }
1818         }
1819     }
1820 }