9646a88ca7eb15d849a2b75cc6b47fab1d646006
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019-2022 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2023 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.distributed.locking;
23
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
27 import static org.assertj.core.api.Assertions.assertThatThrownBy;
28 import static org.junit.Assert.assertEquals;
29 import static org.junit.Assert.assertFalse;
30 import static org.junit.Assert.assertNotNull;
31 import static org.junit.Assert.assertNotSame;
32 import static org.junit.Assert.assertNull;
33 import static org.junit.Assert.assertSame;
34 import static org.junit.Assert.assertTrue;
35 import static org.mockito.ArgumentMatchers.any;
36 import static org.mockito.ArgumentMatchers.anyBoolean;
37 import static org.mockito.ArgumentMatchers.anyLong;
38 import static org.mockito.ArgumentMatchers.eq;
39 import static org.mockito.Mockito.doThrow;
40 import static org.mockito.Mockito.mock;
41 import static org.mockito.Mockito.never;
42 import static org.mockito.Mockito.times;
43 import static org.mockito.Mockito.verify;
44 import static org.mockito.Mockito.when;
45
46 import java.io.ByteArrayInputStream;
47 import java.io.ByteArrayOutputStream;
48 import java.io.ObjectInputStream;
49 import java.io.ObjectOutputStream;
50 import java.sql.Connection;
51 import java.sql.DriverManager;
52 import java.sql.PreparedStatement;
53 import java.sql.ResultSet;
54 import java.sql.SQLException;
55 import java.sql.SQLTransientException;
56 import java.util.ArrayList;
57 import java.util.List;
58 import java.util.Properties;
59 import java.util.concurrent.Executors;
60 import java.util.concurrent.RejectedExecutionException;
61 import java.util.concurrent.ScheduledExecutorService;
62 import java.util.concurrent.ScheduledFuture;
63 import java.util.concurrent.Semaphore;
64 import java.util.concurrent.TimeUnit;
65 import java.util.concurrent.atomic.AtomicBoolean;
66 import java.util.concurrent.atomic.AtomicInteger;
67 import java.util.concurrent.atomic.AtomicReference;
68 import org.apache.commons.dbcp2.BasicDataSource;
69 import org.junit.After;
70 import org.junit.AfterClass;
71 import org.junit.Before;
72 import org.junit.BeforeClass;
73 import org.junit.Test;
74 import org.junit.runner.RunWith;
75 import org.kie.api.runtime.KieSession;
76 import org.mockito.ArgumentCaptor;
77 import org.mockito.Mock;
78 import org.mockito.junit.MockitoJUnitRunner;
79 import org.onap.policy.common.utils.services.OrderedServiceImpl;
80 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
81 import org.onap.policy.drools.core.PolicySession;
82 import org.onap.policy.drools.core.lock.Lock;
83 import org.onap.policy.drools.core.lock.LockCallback;
84 import org.onap.policy.drools.core.lock.LockState;
85 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
86 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
87 import org.onap.policy.drools.system.PolicyEngine;
88 import org.onap.policy.drools.system.PolicyEngineConstants;
89 import org.springframework.test.util.ReflectionTestUtils;
90
91 @RunWith(MockitoJUnitRunner.class)
92 public class DistributedLockManagerTest {
93     private static final long EXPIRE_SEC = 900L;
94     private static final long RETRY_SEC = 60L;
95     private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
96     private static final String OTHER_HOST = "other-host";
97     private static final String OTHER_OWNER = "other-owner";
98     private static final String EXPECTED_EXCEPTION = "expected exception";
99     private static final String DB_CONNECTION =
100         "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
101     private static final String DB_USER = "user";
102     private static final String DB_PASSWORD = "password";
103     private static final String OWNER_KEY = "my key";
104     private static final String RESOURCE = "my resource";
105     private static final String RESOURCE2 = "my resource #2";
106     private static final String RESOURCE3 = "my resource #3";
107     private static final String RESOURCE4 = "my resource #4";
108     private static final String RESOURCE5 = "my resource #5";
109     private static final int HOLD_SEC = 100;
110     private static final int HOLD_SEC2 = 120;
111     private static final int MAX_THREADS = 5;
112     private static final int MAX_LOOPS = 100;
113     private static final boolean TRANSIENT = true;
114     private static final boolean PERMANENT = false;
115
116     // number of execute() calls before the first lock attempt
117     private static final int PRE_LOCK_EXECS = 1;
118
119     // number of execute() calls before the first schedule attempt
120     private static final int PRE_SCHED_EXECS = 1;
121
122     private static Connection conn = null;
123     private static ScheduledExecutorService saveExec;
124     private static ScheduledExecutorService realExec;
125
126     @Mock
127     private PolicyEngine engine;
128
129     @Mock
130     private KieSession kieSess;
131
132     @Mock
133     private ScheduledExecutorService exsvc;
134
135     @Mock
136     private ScheduledFuture<?> checker;
137
138     @Mock
139     private LockCallback callback;
140
141     @Mock
142     private BasicDataSource datasrc;
143
144     private DistributedLock lock;
145     private PolicySession session;
146
147     private AtomicInteger nactive;
148     private AtomicInteger nsuccesses;
149     private DistributedLockManager feature;
150
151     /**
152      * Configures the location of the property files and creates the DB.
153      *
154      * @throws SQLException if the DB cannot be created
155      */
156     @BeforeClass
157     public static void setUpBeforeClass() throws SQLException {
158         SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
159         PolicyEngineConstants.getManager().configure(new Properties());
160
161         conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
162
163         try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
164             + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
165             + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
166             createStmt.executeUpdate();
167         }
168
169         saveExec = (ScheduledExecutorService) ReflectionTestUtils.getField(PolicyEngineConstants.getManager(),
170             POLICY_ENGINE_EXECUTOR_FIELD);
171
172         realExec = Executors.newScheduledThreadPool(3);
173     }
174
175     /**
176      * Restores static fields.
177      */
178     @AfterClass
179     public static void tearDownAfterClass() throws SQLException {
180         ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
181         realExec.shutdown();
182         conn.close();
183     }
184
185     /**
186      * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
187      * tasks.
188      *
189      * @throws SQLException if the lock records cannot be deleted from the DB
190      */
191     @Before
192     public void setUp() throws SQLException {
193         // grant() and deny() calls will come through here and be immediately executed
194         session = new PolicySession(null, null, kieSess) {
195             @Override
196             public void insertDrools(Object object) {
197                 ((Runnable) object).run();
198             }
199         };
200
201         session.setPolicySession();
202
203         nactive = new AtomicInteger(0);
204         nsuccesses = new AtomicInteger(0);
205
206         cleanDb();
207
208         feature = new MyLockingFeature(true);
209     }
210
211     @After
212     public void tearDown() throws SQLException {
213         shutdownFeature();
214         cleanDb();
215     }
216
217     private void cleanDb() throws SQLException {
218         try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
219             stmt.executeUpdate();
220         }
221     }
222
223     private void shutdownFeature() {
224         if (feature != null) {
225             feature.afterStop(engine);
226             feature = null;
227         }
228     }
229
230     /**
231      * Tests that the feature is found in the expected service sets.
232      */
233     @Test
234     public void testServiceApis() {
235         assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
236             .anyMatch(obj -> obj instanceof DistributedLockManager));
237     }
238
239     @Test
240     public void testGetSequenceNumber() {
241         assertEquals(1000, feature.getSequenceNumber());
242     }
243
244     @Test
245     public void testBeforeCreateLockManager() {
246         assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
247     }
248
249     /**
250      * Tests beforeCreate(), when getProperties() throws a runtime exception.
251      */
252     @Test
253     public void testBeforeCreateLockManagerEx() {
254         shutdownFeature();
255
256         feature = new MyLockingFeature(false) {
257             @Override
258             protected Properties getProperties(String fileName) {
259                 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
260             }
261         };
262
263         Properties props = new Properties();
264         assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, props))
265             .isInstanceOf(DistributedLockManagerException.class);
266     }
267
268     @Test
269     public void testAfterStart() {
270         // verify that cleanup & expire check are both added to the queue
271         verify(exsvc).execute(any());
272         verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
273     }
274
275     /**
276      * Tests afterStart(), when thread pool throws a runtime exception.
277      */
278     @Test
279     public void testAfterStartExInThreadPool() {
280         shutdownFeature();
281
282         feature = new MyLockingFeature(false);
283
284         doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
285
286         assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
287     }
288
289     @Test
290     public void testDeleteExpiredDbLocks() throws SQLException {
291         // add records: two expired, one not
292         insertRecord(RESOURCE, feature.getUuidString(), -1);
293         insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
294         insertRecord(RESOURCE3, OTHER_OWNER, 0);
295         insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
296
297         // get the clean-up function and execute it
298         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
299         verify(exsvc).execute(captor.capture());
300
301         long tbegin = System.currentTimeMillis();
302         Runnable action = captor.getValue();
303         action.run();
304
305         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
306         assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
307         assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
308         assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
309
310         assertEquals(2, getRecordCount());
311     }
312
313     /**
314      * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
315      *
316      */
317     @Test
318     public void testDeleteExpiredDbLocksEx() {
319         feature = new InvalidDbLockingFeature(TRANSIENT);
320
321         // get the clean-up function and execute it
322         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
323         verify(exsvc).execute(captor.capture());
324
325         Runnable action = captor.getValue();
326
327         // should not throw an exception
328         action.run();
329     }
330
331     @Test
332     public void testAfterStop() {
333         shutdownFeature();
334         verify(checker).cancel(anyBoolean());
335
336         feature = new DistributedLockManager();
337
338         // shutdown without calling afterStart()
339
340         shutdownFeature();
341     }
342
343     /**
344      * Tests afterStop(), when the data source throws an exception when close() is called.
345      *
346      */
347     @Test
348     public void testAfterStopEx() {
349         shutdownFeature();
350
351         // use a data source that throws an exception when closed
352         feature = new InvalidDbLockingFeature(TRANSIENT);
353
354         assertThatCode(this::shutdownFeature).doesNotThrowAnyException();
355     }
356
357     @Test
358     public void testCreateLock() throws SQLException {
359         verify(exsvc).execute(any());
360
361         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
362         assertTrue(lock.isWaiting());
363
364         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
365
366         // this lock should fail
367         LockCallback callback2 = mock(LockCallback.class);
368         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
369         assertTrue(lock2.isUnavailable());
370         verify(callback2, never()).lockAvailable(lock2);
371         verify(callback2).lockUnavailable(lock2);
372
373         // this should fail, too
374         LockCallback callback3 = mock(LockCallback.class);
375         DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
376         assertTrue(lock3.isUnavailable());
377         verify(callback3, never()).lockAvailable(lock3);
378         verify(callback3).lockUnavailable(lock3);
379
380         // no change to first
381         assertTrue(lock.isWaiting());
382
383         // no callbacks to the first lock
384         verify(callback, never()).lockAvailable(lock);
385         verify(callback, never()).lockUnavailable(lock);
386
387         assertTrue(lock.isWaiting());
388         assertEquals(0, getRecordCount());
389
390         runLock(0, 0);
391         assertTrue(lock.isActive());
392         assertEquals(1, getRecordCount());
393
394         verify(callback).lockAvailable(lock);
395         verify(callback, never()).lockUnavailable(lock);
396
397         // this should succeed
398         DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
399         assertTrue(lock4.isWaiting());
400
401         // after running checker, original records should still remain
402         runChecker(0, 0, EXPIRE_SEC);
403         assertEquals(1, getRecordCount());
404         verify(callback, never()).lockUnavailable(lock);
405     }
406
407     /**
408      * Tests createLock() when the feature is not the latest instance.
409      */
410     @Test
411     public void testCreateLockNotLatestInstance() {
412         DistributedLockManager.setLatestInstance(null);
413
414         Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
415         assertTrue(lock.isUnavailable());
416         verify(callback, never()).lockAvailable(any());
417         verify(callback).lockUnavailable(lock);
418     }
419
420     @Test
421     public void testCheckExpired() throws SQLException {
422         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
423         runLock(0, 0);
424
425         LockCallback callback2 = mock(LockCallback.class);
426         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
427         runLock(1, 0);
428
429         LockCallback callback3 = mock(LockCallback.class);
430         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
431         runLock(2, 0);
432
433         LockCallback callback4 = mock(LockCallback.class);
434         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
435         runLock(3, 0);
436
437         LockCallback callback5 = mock(LockCallback.class);
438         final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
439         runLock(4, 0);
440
441         assertEquals(5, getRecordCount());
442
443         // expire one record
444         updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
445
446         // change host of another record
447         updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
448
449         // change uuid of another record
450         updateRecord(RESOURCE5, feature.getPdpName(), OTHER_OWNER, HOLD_SEC);
451
452         // run the checker
453         runChecker(0, 0, EXPIRE_SEC);
454
455         // check lock states
456         assertTrue(lock.isUnavailable());
457         assertTrue(lock2.isActive());
458         assertTrue(lock3.isUnavailable());
459         assertTrue(lock4.isActive());
460         assertTrue(lock5.isUnavailable());
461
462         // allow callbacks
463         runLock(2, 2);
464         runLock(3, 1);
465         runLock(4, 0);
466         verify(callback).lockUnavailable(lock);
467         verify(callback3).lockUnavailable(lock3);
468         verify(callback5).lockUnavailable(lock5);
469
470         verify(callback2, never()).lockUnavailable(lock2);
471         verify(callback4, never()).lockUnavailable(lock4);
472
473         // another check should have been scheduled, with the normal interval
474         runChecker(1, 0, EXPIRE_SEC);
475     }
476
477     /**
478      * Tests checkExpired(), when schedule() throws an exception.
479      */
480     @Test
481     public void testCheckExpiredExecRejected() {
482         // arrange for execution to be rejected
483         when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
484             .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
485
486         runChecker(0, 0, EXPIRE_SEC);
487     }
488
489     /**
490      * Tests checkExpired(), when getConnection() throws an exception.
491      */
492     @Test
493     public void testCheckExpiredSqlEx() {
494         // use a data source that throws an exception when getConnection() is called
495         feature = new InvalidDbLockingFeature(TRANSIENT);
496
497         runChecker(0, 0, EXPIRE_SEC);
498
499         // it should have scheduled another check, sooner
500         runChecker(0, 0, RETRY_SEC);
501     }
502
503     /**
504      * Tests checkExpired(), when getConnection() throws an exception and the feature is
505      * no longer alive.
506      */
507     @Test
508     public void testCheckExpiredSqlExFeatureStopped() {
509         // use a data source that throws an exception when getConnection() is called
510         feature = new InvalidDbLockingFeature(TRANSIENT) {
511             @Override
512             protected SQLException makeEx() {
513                 this.stop();
514                 return super.makeEx();
515             }
516         };
517
518         runChecker(0, 0, EXPIRE_SEC);
519
520         // it should NOT have scheduled another check
521         verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
522     }
523
524     @Test
525     public void testExpireLocks() throws SQLException {
526         AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
527
528         feature = new MyLockingFeature(true) {
529             @Override
530             protected BasicDataSource makeDataSource() throws Exception {
531                 // get the real data source
532                 BasicDataSource src2 = super.makeDataSource();
533
534                 when(datasrc.getConnection()).thenAnswer(answer -> {
535                     DistributedLock lck = freeLock.getAndSet(null);
536                     if (lck != null) {
537                         // free it
538                         lck.free();
539
540                         // run its doUnlock
541                         runLock(4, 0);
542                     }
543
544                     return src2.getConnection();
545                 });
546
547                 return datasrc;
548             }
549         };
550
551         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
552         runLock(0, 0);
553
554         LockCallback callback2 = mock(LockCallback.class);
555         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
556         runLock(1, 0);
557
558         LockCallback callback3 = mock(LockCallback.class);
559         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
560         // don't run doLock for lock3 - leave it in the waiting state
561
562         LockCallback callback4 = mock(LockCallback.class);
563         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
564         runLock(3, 0);
565
566         assertEquals(3, getRecordCount());
567
568         // expire one record
569         updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
570
571         // arrange to free lock4 while the checker is running
572         freeLock.set(lock4);
573
574         // run the checker
575         runChecker(0, 0, EXPIRE_SEC);
576
577         // check lock states
578         assertTrue(lock.isUnavailable());
579         assertTrue(lock2.isActive());
580         assertTrue(lock3.isWaiting());
581         assertTrue(lock4.isUnavailable());
582
583         runLock(4, 0);
584         verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
585
586         verify(callback).lockUnavailable(lock);
587         verify(callback2, never()).lockUnavailable(lock2);
588         verify(callback3, never()).lockUnavailable(lock3);
589         verify(callback4, never()).lockUnavailable(lock4);
590     }
591
592     @Test
593     public void testDistributedLockNoArgs() {
594         DistributedLock lock = new DistributedLock();
595         assertNull(lock.getResourceId());
596         assertNull(lock.getOwnerKey());
597         assertNull(lock.getCallback());
598         assertEquals(0, lock.getHoldSec());
599     }
600
601     @Test
602     public void testDistributedLock() {
603         assertThatIllegalArgumentException()
604             .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
605             .withMessageContaining("holdSec");
606
607         // should generate no exception
608         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
609     }
610
611     @Test
612     public void testDistributedLockSerializable() throws Exception {
613         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
614         lock = roundTrip(lock);
615
616         assertTrue(lock.isWaiting());
617
618         assertEquals(RESOURCE, lock.getResourceId());
619         assertEquals(OWNER_KEY, lock.getOwnerKey());
620         assertNull(lock.getCallback());
621         assertEquals(HOLD_SEC, lock.getHoldSec());
622     }
623
624     @Test
625     public void testGrant() {
626         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
627         assertFalse(lock.isActive());
628
629         // execute the doLock() call
630         runLock(0, 0);
631
632         assertTrue(lock.isActive());
633
634         // the callback for the lock should have been run in the foreground thread
635         verify(callback).lockAvailable(lock);
636     }
637
638     @Test
639     public void testDistributedLockDeny() {
640         // get a lock
641         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
642
643         // get another lock - should fail
644         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
645
646         assertTrue(lock.isUnavailable());
647
648         // the callback for the second lock should have been run in the foreground thread
649         verify(callback).lockUnavailable(lock);
650
651         // should only have a request for the first lock
652         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
653     }
654
655     @Test
656     public void testDistributedLockFree() {
657         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
658
659         assertTrue(lock.free());
660         assertTrue(lock.isUnavailable());
661
662         // run both requests associated with the lock
663         runLock(0, 1);
664         runLock(1, 0);
665
666         // should not have changed state
667         assertTrue(lock.isUnavailable());
668
669         // attempt to free it again
670         assertFalse(lock.free());
671
672         // should not have queued anything else
673         verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
674
675         // new lock should succeed
676         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
677         assertNotSame(lock2, lock);
678         assertTrue(lock2.isWaiting());
679     }
680
681     /**
682      * Tests that free() works on a serialized lock with a new feature.
683      *
684      * @throws Exception if an error occurs
685      */
686     @Test
687     public void testDistributedLockFreeSerialized() throws Exception {
688         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
689
690         feature = new MyLockingFeature(true);
691
692         lock = roundTrip(lock);
693         assertTrue(lock.free());
694         assertTrue(lock.isUnavailable());
695     }
696
697     /**
698      * Tests free() on a serialized lock without a feature.
699      *
700      * @throws Exception if an error occurs
701      */
702     @Test
703     public void testDistributedLockFreeNoFeature() throws Exception {
704         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
705
706         DistributedLockManager.setLatestInstance(null);
707
708         lock = roundTrip(lock);
709         assertFalse(lock.free());
710         assertTrue(lock.isUnavailable());
711     }
712
713     /**
714      * Tests the case where the lock is freed and doUnlock called between the call to
715      * isUnavailable() and the call to compute().
716      */
717     @Test
718     public void testDistributedLockFreeUnlocked() {
719         feature = new FreeWithFreeLockingFeature(true);
720
721         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
722
723         assertFalse(lock.free());
724         assertTrue(lock.isUnavailable());
725     }
726
727     /**
728      * Tests the case where the lock is freed, but doUnlock is not completed, between the
729      * call to isUnavailable() and the call to compute().
730      */
731     @Test
732     public void testDistributedLockFreeLockFreed() {
733         feature = new FreeWithFreeLockingFeature(false);
734
735         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
736
737         assertFalse(lock.free());
738         assertTrue(lock.isUnavailable());
739     }
740
741     @Test
742     public void testDistributedLockExtend() {
743         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
744
745         // lock2 should be denied - called back by this thread
746         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
747         verify(callback, never()).lockAvailable(lock2);
748         verify(callback).lockUnavailable(lock2);
749
750         // lock2 will still be denied - called back by this thread
751         lock2.extend(HOLD_SEC, callback);
752         verify(callback, times(2)).lockUnavailable(lock2);
753
754         // force lock2 to be active - should still be denied
755         ReflectionTestUtils.setField(lock2, "state", LockState.ACTIVE);
756         lock2.extend(HOLD_SEC, callback);
757         verify(callback, times(3)).lockUnavailable(lock2);
758
759         assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
760             .withMessageContaining("holdSec");
761
762         // execute doLock()
763         runLock(0, 0);
764         assertTrue(lock.isActive());
765
766         // now extend the first lock
767         LockCallback callback2 = mock(LockCallback.class);
768         lock.extend(HOLD_SEC2, callback2);
769         assertTrue(lock.isWaiting());
770
771         // execute doExtend()
772         runLock(1, 0);
773         lock.extend(HOLD_SEC2, callback2);
774         assertEquals(HOLD_SEC2, lock.getHoldSec());
775         verify(callback2).lockAvailable(lock);
776         verify(callback2, never()).lockUnavailable(lock);
777     }
778
779     /**
780      * Tests that extend() works on a serialized lock with a new feature.
781      *
782      * @throws Exception if an error occurs
783      */
784     @Test
785     public void testDistributedLockExtendSerialized() throws Exception {
786         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
787
788         // run doLock
789         runLock(0, 0);
790         assertTrue(lock.isActive());
791
792         feature = new MyLockingFeature(true);
793
794         lock = roundTrip(lock);
795         assertTrue(lock.isActive());
796
797         LockCallback scallback = mock(LockCallback.class);
798
799         lock.extend(HOLD_SEC, scallback);
800         assertTrue(lock.isWaiting());
801
802         // run doExtend (in new feature)
803         runLock(0, 0);
804         assertTrue(lock.isActive());
805
806         verify(scallback).lockAvailable(lock);
807         verify(scallback, never()).lockUnavailable(lock);
808     }
809
810     /**
811      * Tests extend() on a serialized lock without a feature.
812      *
813      * @throws Exception if an error occurs
814      */
815     @Test
816     public void testDistributedLockExtendNoFeature() throws Exception {
817         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
818
819         // run doLock
820         runLock(0, 0);
821         assertTrue(lock.isActive());
822
823         DistributedLockManager.setLatestInstance(null);
824
825         lock = roundTrip(lock);
826         assertTrue(lock.isActive());
827
828         LockCallback scallback = mock(LockCallback.class);
829
830         lock.extend(HOLD_SEC, scallback);
831         assertTrue(lock.isUnavailable());
832
833         verify(scallback, never()).lockAvailable(lock);
834         verify(scallback).lockUnavailable(lock);
835     }
836
837     /**
838      * Tests the case where the lock is freed and doUnlock called between the call to
839      * isUnavailable() and the call to compute().
840      */
841     @Test
842     public void testDistributedLockExtendUnlocked() {
843         feature = new FreeWithFreeLockingFeature(true);
844
845         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
846
847         lock.extend(HOLD_SEC2, callback);
848         assertTrue(lock.isUnavailable());
849         verify(callback).lockUnavailable(lock);
850     }
851
852     /**
853      * Tests the case where the lock is freed, but doUnlock is not completed, between the
854      * call to isUnavailable() and the call to compute().
855      */
856     @Test
857     public void testDistributedLockExtendLockFreed() {
858         feature = new FreeWithFreeLockingFeature(false);
859
860         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
861
862         lock.extend(HOLD_SEC2, callback);
863         assertTrue(lock.isUnavailable());
864         verify(callback).lockUnavailable(lock);
865     }
866
867     @Test
868     public void testDistributedLockScheduleRequest() {
869         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
870         runLock(0, 0);
871
872         verify(callback).lockAvailable(lock);
873     }
874
875     @Test
876     public void testDistributedLockRescheduleRequest() {
877         // use a data source that throws an exception when getConnection() is called
878         InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
879         feature = invfeat;
880
881         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
882
883         // invoke doLock - should fail and reschedule
884         runLock(0, 0);
885
886         // should still be waiting
887         assertTrue(lock.isWaiting());
888         verify(callback, never()).lockUnavailable(lock);
889
890         // free the lock while doLock is executing
891         invfeat.freeLock = true;
892
893         // try scheduled request - should just invoke doUnlock
894         runSchedule(0, 0);
895
896         // should still be waiting
897         assertTrue(lock.isUnavailable());
898         verify(callback, never()).lockUnavailable(lock);
899
900         // should have scheduled a retry of doUnlock
901         verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
902     }
903
904     @Test
905     public void testDistributedLockGetNextRequest() {
906         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
907
908         /*
909          * run doLock. This should cause getNextRequest() to be called twice, once with a
910          * request in the queue, and the second time with request=null.
911          */
912         runLock(0, 0);
913     }
914
915     /**
916      * Tests getNextRequest(), where the same request is still in the queue the second
917      * time it's called.
918      */
919     @Test
920     public void testDistributedLockGetNextRequestSameRequest() {
921         // force reschedule to be invoked
922         feature = new InvalidDbLockingFeature(TRANSIENT);
923
924         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
925
926         /*
927          * run doLock. This should cause getNextRequest() to be called twice, once with a
928          * request in the queue, and the second time with the same request again.
929          */
930         runLock(0, 0);
931
932         verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
933     }
934
935     @Test
936     public void testDistributedLockDoRequest() {
937         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
938
939         assertTrue(lock.isWaiting());
940
941         // run doLock via doRequest
942         runLock(0, 0);
943
944         assertTrue(lock.isActive());
945     }
946
947     /**
948      * Tests doRequest(), when doRequest() is already running within another thread.
949      */
950     @Test
951     public void testDistributedLockDoRequestBusy() {
952         /*
953          * this feature will invoke a request in a background thread while it's being run
954          * in a foreground thread.
955          */
956         AtomicBoolean running = new AtomicBoolean(false);
957         AtomicBoolean returned = new AtomicBoolean(false);
958
959         feature = new MyLockingFeature(true) {
960             @Override
961             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
962                 LockCallback callback) {
963                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
964                     private static final long serialVersionUID = 1L;
965
966                     @Override
967                     protected boolean doDbInsert(Connection conn) throws SQLException {
968                         if (running.get()) {
969                             // already inside the thread - don't recurse any further
970                             return super.doDbInsert(conn);
971                         }
972
973                         running.set(true);
974
975                         Thread thread = new Thread(() -> {
976                             // run doLock from within the new thread
977                             runLock(0, 0);
978                         });
979                         thread.setDaemon(true);
980                         thread.start();
981
982                         // wait for the background thread to complete before continuing
983                         try {
984                             thread.join(5000);
985                         } catch (InterruptedException ignore) {
986                             Thread.currentThread().interrupt();
987                         }
988
989                         returned.set(!thread.isAlive());
990
991                         return super.doDbInsert(conn);
992                     }
993                 };
994             }
995         };
996
997         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
998
999         // run doLock
1000         runLock(0, 0);
1001
1002         assertTrue(returned.get());
1003     }
1004
1005     /**
1006      * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1007      *
1008      * @throws SQLException if an error occurs
1009      */
1010     @Test
1011     public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1012         // throw run-time exception
1013         when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1014
1015         // use a data source that throws an exception when getConnection() is called
1016         feature = new MyLockingFeature(true) {
1017             @Override
1018             protected BasicDataSource makeDataSource() {
1019                 return datasrc;
1020             }
1021         };
1022
1023         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1024
1025         // invoke doLock - should NOT reschedule
1026         runLock(0, 0);
1027
1028         assertTrue(lock.isUnavailable());
1029         verify(callback).lockUnavailable(lock);
1030
1031         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1032     }
1033
1034     /**
1035      * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1036      * state.
1037      *
1038      * @throws SQLException if an error occurs
1039      */
1040     @Test
1041     public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1042         // throw run-time exception
1043         when(datasrc.getConnection()).thenAnswer(answer -> {
1044             lock.free();
1045             throw new IllegalStateException(EXPECTED_EXCEPTION);
1046         });
1047
1048         // use a data source that throws an exception when getConnection() is called
1049         feature = new MyLockingFeature(true) {
1050             @Override
1051             protected BasicDataSource makeDataSource() {
1052                 return datasrc;
1053             }
1054         };
1055
1056         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1057
1058         // invoke doLock - should NOT reschedule
1059         runLock(0, 0);
1060
1061         assertTrue(lock.isUnavailable());
1062         verify(callback, never()).lockUnavailable(lock);
1063
1064         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1065     }
1066
1067     /**
1068      * Tests doRequest() when the retry count gets exhausted.
1069      */
1070     @Test
1071     public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1072         // use a data source that throws an exception when getConnection() is called
1073         feature = new InvalidDbLockingFeature(TRANSIENT);
1074
1075         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1076
1077         // invoke doLock - should fail and reschedule
1078         runLock(0, 0);
1079
1080         // should still be waiting
1081         assertTrue(lock.isWaiting());
1082         verify(callback, never()).lockUnavailable(lock);
1083
1084         // try again, via SCHEDULER - first retry fails
1085         runSchedule(0, 0);
1086
1087         // should still be waiting
1088         assertTrue(lock.isWaiting());
1089         verify(callback, never()).lockUnavailable(lock);
1090
1091         // try again, via SCHEDULER - final retry fails
1092         runSchedule(1, 0);
1093         assertTrue(lock.isUnavailable());
1094
1095         // now callback should have been called
1096         verify(callback).lockUnavailable(lock);
1097     }
1098
1099     /**
1100      * Tests doRequest() when a non-transient DB exception is thrown.
1101      */
1102     @Test
1103     public void testDistributedLockDoRequestNotTransient() {
1104         /*
1105          * use a data source that throws a PERMANENT exception when getConnection() is
1106          * called
1107          */
1108         feature = new InvalidDbLockingFeature(PERMANENT);
1109
1110         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1111
1112         // invoke doLock - should fail
1113         runLock(0, 0);
1114
1115         assertTrue(lock.isUnavailable());
1116         verify(callback).lockUnavailable(lock);
1117
1118         // should not have scheduled anything new
1119         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1120         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1121     }
1122
1123     @Test
1124     public void testDistributedLockDoLock() throws SQLException {
1125         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1126
1127         // invoke doLock - should simply do an insert
1128         long tbegin = System.currentTimeMillis();
1129         runLock(0, 0);
1130
1131         assertEquals(1, getRecordCount());
1132         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1133         verify(callback).lockAvailable(lock);
1134     }
1135
1136     /**
1137      * Tests doLock() when the lock is freed before doLock runs.
1138      *
1139      * @throws SQLException if an error occurs
1140      */
1141     @Test
1142     public void testDistributedLockDoLockFreed() throws SQLException {
1143         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1144
1145         lock.setState(LockState.UNAVAILABLE);
1146
1147         // invoke doLock - should do nothing
1148         runLock(0, 0);
1149
1150         assertEquals(0, getRecordCount());
1151
1152         verify(callback, never()).lockAvailable(lock);
1153     }
1154
1155     /**
1156      * Tests doLock() when a DB exception is thrown.
1157      */
1158     @Test
1159     public void testDistributedLockDoLockEx() {
1160         // use a data source that throws an exception when getConnection() is called
1161         feature = new InvalidDbLockingFeature(PERMANENT);
1162
1163         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1164
1165         // invoke doLock - should simply do an insert
1166         runLock(0, 0);
1167
1168         // lock should have failed due to exception
1169         verify(callback).lockUnavailable(lock);
1170     }
1171
1172     /**
1173      * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1174      * to be called.
1175      */
1176     @Test
1177     public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1178         // insert an expired record
1179         insertRecord(RESOURCE, feature.getUuidString(), 0);
1180
1181         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1182
1183         // invoke doLock - should simply do an update
1184         runLock(0, 0);
1185         verify(callback).lockAvailable(lock);
1186     }
1187
1188     /**
1189      * Tests doLock() when a locked record already exists.
1190      */
1191     @Test
1192     public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1193         // insert an expired record
1194         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1195
1196         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1197
1198         // invoke doLock
1199         runLock(0, 0);
1200
1201         // lock should have failed because it's already locked
1202         verify(callback).lockUnavailable(lock);
1203     }
1204
1205     @Test
1206     public void testDistributedLockDoUnlock() throws SQLException {
1207         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1208
1209         // invoke doLock()
1210         runLock(0, 0);
1211
1212         lock.free();
1213
1214         // invoke doUnlock()
1215         long tbegin = System.currentTimeMillis();
1216         runLock(1, 0);
1217
1218         assertEquals(0, getRecordCount());
1219         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1220
1221         assertTrue(lock.isUnavailable());
1222
1223         // no more callbacks should have occurred
1224         verify(callback, times(1)).lockAvailable(lock);
1225         verify(callback, never()).lockUnavailable(lock);
1226     }
1227
1228     /**
1229      * Tests doUnlock() when a DB exception is thrown.
1230      *
1231      */
1232     @Test
1233     public void testDistributedLockDoUnlockEx() {
1234         feature = new InvalidDbLockingFeature(PERMANENT);
1235
1236         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1237
1238         // do NOT invoke doLock() - it will fail without a DB connection
1239
1240         lock.free();
1241
1242         // invoke doUnlock()
1243         runLock(1, 0);
1244
1245         assertTrue(lock.isUnavailable());
1246
1247         // no more callbacks should have occurred
1248         verify(callback, never()).lockAvailable(lock);
1249         verify(callback, never()).lockUnavailable(lock);
1250     }
1251
1252     @Test
1253     public void testDistributedLockDoExtend() throws SQLException {
1254         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1255         runLock(0, 0);
1256
1257         LockCallback callback2 = mock(LockCallback.class);
1258         lock.extend(HOLD_SEC2, callback2);
1259
1260         // call doExtend()
1261         long tbegin = System.currentTimeMillis();
1262         runLock(1, 0);
1263
1264         assertEquals(1, getRecordCount());
1265         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1266
1267         assertTrue(lock.isActive());
1268
1269         // no more callbacks should have occurred
1270         verify(callback).lockAvailable(lock);
1271         verify(callback, never()).lockUnavailable(lock);
1272
1273         // extension should have succeeded
1274         verify(callback2).lockAvailable(lock);
1275         verify(callback2, never()).lockUnavailable(lock);
1276     }
1277
1278     /**
1279      * Tests doExtend() when the lock is freed before doExtend runs.
1280      *
1281      * @throws SQLException if an error occurs
1282      */
1283     @Test
1284     public void testDistributedLockDoExtendFreed() throws SQLException {
1285         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1286         lock.extend(HOLD_SEC2, callback);
1287
1288         lock.setState(LockState.UNAVAILABLE);
1289
1290         // invoke doExtend - should do nothing
1291         runLock(1, 0);
1292
1293         assertEquals(0, getRecordCount());
1294
1295         verify(callback, never()).lockAvailable(lock);
1296     }
1297
1298     /**
1299      * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1300      * insert.
1301      *
1302      * @throws SQLException if an error occurs
1303      */
1304     @Test
1305     public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1306         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1307         runLock(0, 0);
1308
1309         LockCallback callback2 = mock(LockCallback.class);
1310         lock.extend(HOLD_SEC2, callback2);
1311
1312         // delete the record so it's forced to re-insert it
1313         cleanDb();
1314
1315         // call doExtend()
1316         long tbegin = System.currentTimeMillis();
1317         runLock(1, 0);
1318
1319         assertEquals(1, getRecordCount());
1320         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1321
1322         assertTrue(lock.isActive());
1323
1324         // no more callbacks should have occurred
1325         verify(callback).lockAvailable(lock);
1326         verify(callback, never()).lockUnavailable(lock);
1327
1328         // extension should have succeeded
1329         verify(callback2).lockAvailable(lock);
1330         verify(callback2, never()).lockUnavailable(lock);
1331     }
1332
1333     /**
1334      * Tests doExtend() when both update and insert fail.
1335      *
1336      */
1337     @Test
1338     public void testDistributedLockDoExtendNeitherSucceeds() {
1339         /*
1340          * this feature will create a lock that returns false when doDbUpdate() is
1341          * invoked, or when doDbInsert() is invoked a second time
1342          */
1343         feature = new MyLockingFeature(true) {
1344             @Override
1345             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1346                 LockCallback callback) {
1347                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1348                     private static final long serialVersionUID = 1L;
1349                     private int ntimes = 0;
1350
1351                     @Override
1352                     protected boolean doDbInsert(Connection conn) throws SQLException {
1353                         if (ntimes++ > 0) {
1354                             return false;
1355                         }
1356
1357                         return super.doDbInsert(conn);
1358                     }
1359
1360                     @Override
1361                     protected boolean doDbUpdate(Connection conn) {
1362                         return false;
1363                     }
1364                 };
1365             }
1366         };
1367
1368         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1369         runLock(0, 0);
1370
1371         LockCallback callback2 = mock(LockCallback.class);
1372         lock.extend(HOLD_SEC2, callback2);
1373
1374         // call doExtend()
1375         runLock(1, 0);
1376
1377         assertTrue(lock.isUnavailable());
1378
1379         // no more callbacks should have occurred
1380         verify(callback).lockAvailable(lock);
1381         verify(callback, never()).lockUnavailable(lock);
1382
1383         // extension should have failed
1384         verify(callback2, never()).lockAvailable(lock);
1385         verify(callback2).lockUnavailable(lock);
1386     }
1387
1388     /**
1389      * Tests doExtend() when an exception occurs.
1390      *
1391      * @throws SQLException if an error occurs
1392      */
1393     @Test
1394     public void testDistributedLockDoExtendEx() throws SQLException {
1395         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1396         runLock(0, 0);
1397
1398         /*
1399          * delete the record and insert one with a different owner, which will cause
1400          * doDbInsert() to throw an exception
1401          */
1402         cleanDb();
1403         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1404
1405         LockCallback callback2 = mock(LockCallback.class);
1406         lock.extend(HOLD_SEC2, callback2);
1407
1408         // call doExtend()
1409         runLock(1, 0);
1410
1411         assertTrue(lock.isUnavailable());
1412
1413         // no more callbacks should have occurred
1414         verify(callback).lockAvailable(lock);
1415         verify(callback, never()).lockUnavailable(lock);
1416
1417         // extension should have failed
1418         verify(callback2, never()).lockAvailable(lock);
1419         verify(callback2).lockUnavailable(lock);
1420     }
1421
1422     @Test
1423     public void testDistributedLockToString() {
1424         String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
1425         assertNotNull(text);
1426         assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1427     }
1428
1429     @Test
1430     public void testMakeThreadPool() {
1431         // use a REAL feature to test this
1432         feature = new DistributedLockManager();
1433
1434         // this should create a thread pool
1435         feature.beforeCreateLockManager(engine, new Properties());
1436         feature.afterStart(engine);
1437
1438         assertThatCode(this::shutdownFeature).doesNotThrowAnyException();
1439     }
1440
1441     /**
1442      * Performs a multi-threaded test of the locking facility.
1443      *
1444      * @throws InterruptedException if the current thread is interrupted while waiting for
1445      *         the background threads to complete
1446      */
1447     @Test
1448     public void testMultiThreaded() throws InterruptedException {
1449         ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1450
1451         feature = new DistributedLockManager();
1452         feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1453         feature.afterStart(PolicyEngineConstants.getManager());
1454
1455         List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1456         for (int x = 0; x < MAX_THREADS; ++x) {
1457             threads.add(new MyThread());
1458         }
1459
1460         threads.forEach(Thread::start);
1461
1462         for (MyThread thread : threads) {
1463             thread.join(6000);
1464             assertFalse(thread.isAlive());
1465         }
1466
1467         for (MyThread thread : threads) {
1468             if (thread.err != null) {
1469                 throw thread.err;
1470             }
1471         }
1472
1473         assertTrue(nsuccesses.get() > 0);
1474     }
1475
1476     private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
1477         boolean waitForLock) {
1478         return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
1479     }
1480
1481     private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1482         ByteArrayOutputStream baos = new ByteArrayOutputStream();
1483         try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1484             oos.writeObject(lock);
1485         }
1486
1487         ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1488         try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1489             return (DistributedLock) ois.readObject();
1490         }
1491     }
1492
1493     /**
1494      * Runs the checkExpired() action.
1495      *
1496      * @param nskip number of actions in the work queue to skip
1497      * @param nadditional number of additional actions that appear in the work queue
1498      *        <i>after</i> the checkExpired action
1499      * @param schedSec number of seconds for which the checker should have been scheduled
1500      */
1501     private void runChecker(int nskip, int nadditional, long schedSec) {
1502         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1503         verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1504         Runnable action = captor.getAllValues().get(nskip);
1505         action.run();
1506     }
1507
1508     /**
1509      * Runs a lock action (e.g., doLock, doUnlock).
1510      *
1511      * @param nskip number of actions in the work queue to skip
1512      * @param nadditional number of additional actions that appear in the work queue
1513      *        <i>after</i> the desired action
1514      */
1515     void runLock(int nskip, int nadditional) {
1516         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1517         verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1518
1519         Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1520         action.run();
1521     }
1522
1523     /**
1524      * Runs a scheduled action (e.g., "retry" action).
1525      *
1526      * @param nskip number of actions in the work queue to skip
1527      * @param nadditional number of additional actions that appear in the work queue
1528      *        <i>after</i> the desired action
1529      */
1530     void runSchedule(int nskip, int nadditional) {
1531         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1532         verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
1533
1534         Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1535         action.run();
1536     }
1537
1538     /**
1539      * Gets a count of the number of lock records in the DB.
1540      *
1541      * @return the number of lock records in the DB
1542      * @throws SQLException if an error occurs accessing the DB
1543      */
1544     private int getRecordCount() throws SQLException {
1545         try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1546             ResultSet result = stmt.executeQuery()) {
1547
1548             if (result.next()) {
1549                 return result.getInt(1);
1550
1551             } else {
1552                 return 0;
1553             }
1554         }
1555     }
1556
1557     /**
1558      * Determines if there is a record for the given resource whose expiration time is in
1559      * the expected range.
1560      *
1561      * @param resourceId ID of the resource of interest
1562      * @param uuidString UUID string of the owner
1563      * @param holdSec seconds for which the lock was to be held
1564      * @param tbegin earliest time, in milliseconds, at which the record could have been
1565      *        inserted into the DB
1566      * @return {@code true} if a record is found, {@code false} otherwise
1567      * @throws SQLException if an error occurs accessing the DB
1568      */
1569     private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1570         try (PreparedStatement stmt =
1571             conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1572                 + " WHERE resourceId=? AND host=? AND owner=?")) {
1573
1574             stmt.setString(1, resourceId);
1575             stmt.setString(2, feature.getPdpName());
1576             stmt.setString(3, uuidString);
1577
1578             try (ResultSet result = stmt.executeQuery()) {
1579                 if (result.next()) {
1580                     int remaining = result.getInt(1);
1581                     long maxDiff = System.currentTimeMillis() - tbegin;
1582                     return (remaining >= 0 && holdSec - remaining <= maxDiff);
1583
1584                 } else {
1585                     return false;
1586                 }
1587             }
1588         }
1589     }
1590
1591     /**
1592      * Inserts a record into the DB.
1593      *
1594      * @param resourceId ID of the resource of interest
1595      * @param uuidString UUID string of the owner
1596      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1597      * @throws SQLException if an error occurs accessing the DB
1598      */
1599     private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1600         this.insertRecord(resourceId, feature.getPdpName(), uuidString, expireOffset);
1601     }
1602
1603     private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1604         throws SQLException {
1605         try (PreparedStatement stmt =
1606             conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1607                 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1608
1609             stmt.setString(1, resourceId);
1610             stmt.setString(2, hostName);
1611             stmt.setString(3, uuidString);
1612             stmt.setInt(4, expireOffset);
1613
1614             assertEquals(1, stmt.executeUpdate());
1615         }
1616     }
1617
1618     /**
1619      * Updates a record in the DB.
1620      *
1621      * @param resourceId ID of the resource of interest
1622      * @param newUuid UUID string of the <i>new</i> owner
1623      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1624      * @throws SQLException if an error occurs accessing the DB
1625      */
1626     private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1627         try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1628             + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1629
1630             stmt.setString(1, newHost);
1631             stmt.setString(2, newUuid);
1632             stmt.setInt(3, expireOffset);
1633             stmt.setString(4, resourceId);
1634
1635             assertEquals(1, stmt.executeUpdate());
1636         }
1637     }
1638
1639     /**
1640      * Feature that uses <i>exsvc</i> to execute requests.
1641      */
1642     private class MyLockingFeature extends DistributedLockManager {
1643
1644         public MyLockingFeature(boolean init) {
1645             shutdownFeature();
1646
1647             exsvc = mock(ScheduledExecutorService.class);
1648             when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1649             ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1650
1651             if (init) {
1652                 beforeCreateLockManager(engine, new Properties());
1653                 start();
1654                 afterStart(engine);
1655             }
1656         }
1657     }
1658
1659     /**
1660      * Feature whose data source all throws exceptions.
1661      */
1662     private class InvalidDbLockingFeature extends MyLockingFeature {
1663         private boolean isTransient;
1664         private boolean freeLock = false;
1665
1666         public InvalidDbLockingFeature(boolean isTransient) {
1667             // pass "false" because we have to set the error code BEFORE calling
1668             // afterStart()
1669             super(false);
1670
1671             this.isTransient = isTransient;
1672
1673             this.beforeCreateLockManager(engine, new Properties());
1674             this.start();
1675             this.afterStart(engine);
1676         }
1677
1678         @Override
1679         protected BasicDataSource makeDataSource() throws Exception {
1680             when(datasrc.getConnection()).thenAnswer(answer -> {
1681                 if (freeLock) {
1682                     freeLock = false;
1683                     lock.free();
1684                 }
1685
1686                 throw makeEx();
1687             });
1688
1689             doThrow(makeEx()).when(datasrc).close();
1690
1691             return datasrc;
1692         }
1693
1694         protected SQLException makeEx() {
1695             if (isTransient) {
1696                 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1697
1698             } else {
1699                 return new SQLException(EXPECTED_EXCEPTION);
1700             }
1701         }
1702     }
1703
1704     /**
1705      * Feature whose locks free themselves while free() is already running.
1706      */
1707     private class FreeWithFreeLockingFeature extends MyLockingFeature {
1708         private boolean relock;
1709
1710         public FreeWithFreeLockingFeature(boolean relock) {
1711             super(true);
1712             this.relock = relock;
1713         }
1714
1715         @Override
1716         protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1717             LockCallback callback) {
1718
1719             return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1720                 private static final long serialVersionUID = 1L;
1721                 private boolean checked = false;
1722
1723                 @Override
1724                 public boolean isUnavailable() {
1725                     if (checked) {
1726                         return super.isUnavailable();
1727                     }
1728
1729                     checked = true;
1730
1731                     // release and relock
1732                     free();
1733
1734                     if (relock) {
1735                         // run doUnlock
1736                         runLock(1, 0);
1737
1738                         // relock it
1739                         createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1740                     }
1741
1742                     return false;
1743                 }
1744             };
1745         }
1746     }
1747
1748     /**
1749      * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
1750      * extend it, and then unlock it.
1751      */
1752     private class MyThread extends Thread {
1753         AssertionError err = null;
1754
1755         public MyThread() {
1756             setDaemon(true);
1757         }
1758
1759         @Override
1760         public void run() {
1761             try {
1762                 for (int x = 0; x < MAX_LOOPS; ++x) {
1763                     makeAttempt();
1764                 }
1765
1766             } catch (AssertionError e) {
1767                 err = e;
1768             }
1769         }
1770
1771         private void makeAttempt() {
1772             try {
1773                 Semaphore sem = new Semaphore(0);
1774
1775                 LockCallback cb = new LockCallback() {
1776                     @Override
1777                     public void lockAvailable(Lock lock) {
1778                         sem.release();
1779                     }
1780
1781                     @Override
1782                     public void lockUnavailable(Lock lock) {
1783                         sem.release();
1784                     }
1785                 };
1786
1787                 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1788
1789                 // wait for callback, whether available or unavailable
1790                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1791                 if (!lock.isActive()) {
1792                     return;
1793                 }
1794
1795                 nsuccesses.incrementAndGet();
1796
1797                 assertEquals(1, nactive.incrementAndGet());
1798
1799                 lock.extend(HOLD_SEC2, cb);
1800                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1801                 assertTrue(lock.isActive());
1802
1803                 // decrement BEFORE free()
1804                 nactive.decrementAndGet();
1805
1806                 assertTrue(lock.free());
1807                 assertTrue(lock.isUnavailable());
1808
1809             } catch (InterruptedException e) {
1810                 Thread.currentThread().interrupt();
1811                 throw new AssertionError("interrupted", e);
1812             }
1813         }
1814     }
1815 }