5a0c6f77f61e9e05ef70ec7994016170e50e853d
[policy/drools-pdp.git] /
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.distributed.locking;
22
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
26 import static org.assertj.core.api.Assertions.assertThatThrownBy;
27 import static org.junit.Assert.assertEquals;
28 import static org.junit.Assert.assertFalse;
29 import static org.junit.Assert.assertNotNull;
30 import static org.junit.Assert.assertNull;
31 import static org.junit.Assert.assertSame;
32 import static org.junit.Assert.assertTrue;
33 import static org.mockito.ArgumentMatchers.any;
34 import static org.mockito.ArgumentMatchers.anyBoolean;
35 import static org.mockito.ArgumentMatchers.anyLong;
36 import static org.mockito.ArgumentMatchers.eq;
37 import static org.mockito.Mockito.doThrow;
38 import static org.mockito.Mockito.mock;
39 import static org.mockito.Mockito.never;
40 import static org.mockito.Mockito.times;
41 import static org.mockito.Mockito.verify;
42 import static org.mockito.Mockito.when;
43
44 import java.io.ByteArrayInputStream;
45 import java.io.ByteArrayOutputStream;
46 import java.io.ObjectInputStream;
47 import java.io.ObjectOutputStream;
48 import java.sql.Connection;
49 import java.sql.DriverManager;
50 import java.sql.PreparedStatement;
51 import java.sql.ResultSet;
52 import java.sql.SQLException;
53 import java.sql.SQLTransientException;
54 import java.util.ArrayList;
55 import java.util.List;
56 import java.util.Properties;
57 import java.util.concurrent.Executors;
58 import java.util.concurrent.RejectedExecutionException;
59 import java.util.concurrent.ScheduledExecutorService;
60 import java.util.concurrent.ScheduledFuture;
61 import java.util.concurrent.Semaphore;
62 import java.util.concurrent.TimeUnit;
63 import java.util.concurrent.atomic.AtomicBoolean;
64 import java.util.concurrent.atomic.AtomicInteger;
65 import java.util.concurrent.atomic.AtomicReference;
66 import org.apache.commons.dbcp2.BasicDataSource;
67 import org.junit.After;
68 import org.junit.AfterClass;
69 import org.junit.Before;
70 import org.junit.BeforeClass;
71 import org.junit.Test;
72 import org.kie.api.runtime.KieSession;
73 import org.mockito.ArgumentCaptor;
74 import org.mockito.Mock;
75 import org.mockito.MockitoAnnotations;
76 import org.onap.policy.common.utils.services.OrderedServiceImpl;
77 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
78 import org.onap.policy.drools.core.PolicySession;
79 import org.onap.policy.drools.core.lock.Lock;
80 import org.onap.policy.drools.core.lock.LockCallback;
81 import org.onap.policy.drools.core.lock.LockState;
82 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
83 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
84 import org.onap.policy.drools.system.PolicyEngine;
85 import org.onap.policy.drools.system.PolicyEngineConstants;
86 import org.powermock.reflect.Whitebox;
87
88 public class DistributedLockManagerTest {
89     private static final long EXPIRE_SEC = 900L;
90     private static final long RETRY_SEC = 60L;
91     private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
92     private static final String OTHER_HOST = "other-host";
93     private static final String OTHER_OWNER = "other-owner";
94     private static final String EXPECTED_EXCEPTION = "expected exception";
95     private static final String DB_CONNECTION =
96                     "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
97     private static final String DB_USER = "user";
98     private static final String DB_PASSWORD = "password";
99     private static final String OWNER_KEY = "my key";
100     private static final String RESOURCE = "my resource";
101     private static final String RESOURCE2 = "my resource #2";
102     private static final String RESOURCE3 = "my resource #3";
103     private static final String RESOURCE4 = "my resource #4";
104     private static final String RESOURCE5 = "my resource #5";
105     private static final int HOLD_SEC = 100;
106     private static final int HOLD_SEC2 = 120;
107     private static final int MAX_THREADS = 5;
108     private static final int MAX_LOOPS = 100;
109     private static final boolean TRANSIENT = true;
110     private static final boolean PERMANENT = false;
111
112     // number of execute() calls before the first lock attempt
113     private static final int PRE_LOCK_EXECS = 1;
114
115     // number of execute() calls before the first schedule attempt
116     private static final int PRE_SCHED_EXECS = 1;
117
118     private static Connection conn = null;
119     private static ScheduledExecutorService saveExec;
120     private static ScheduledExecutorService realExec;
121
122     @Mock
123     private PolicyEngine engine;
124
125     @Mock
126     private KieSession kieSess;
127
128     @Mock
129     private ScheduledExecutorService exsvc;
130
131     @Mock
132     private ScheduledFuture<?> checker;
133
134     @Mock
135     private LockCallback callback;
136
137     @Mock
138     private BasicDataSource datasrc;
139
140     private DistributedLock lock;
141     private PolicySession session;
142
143     private AtomicInteger nactive;
144     private AtomicInteger nsuccesses;
145     private DistributedLockManager feature;
146
147
148     /**
149      * Configures the location of the property files and creates the DB.
150      *
151      * @throws SQLException if the DB cannot be created
152      */
153     @BeforeClass
154     public static void setUpBeforeClass() throws SQLException {
155         SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
156
157         conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
158
159         try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
160                         + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
161                         + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
162             createStmt.executeUpdate();
163         }
164
165         saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
166
167         realExec = Executors.newScheduledThreadPool(3);
168     }
169
170     /**
171      * Restores static fields.
172      */
173     @AfterClass
174     public static void tearDownAfterClass() throws SQLException {
175         Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
176         realExec.shutdown();
177         conn.close();
178     }
179
180     /**
181      * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
182      * tasks.
183      *
184      * @throws SQLException if the lock records cannot be deleted from the DB
185      */
186     @Before
187     public void setUp() throws SQLException {
188         MockitoAnnotations.initMocks(this);
189
190         // grant() and deny() calls will come through here and be immediately executed
191         session = new PolicySession(null, null, kieSess) {
192             @Override
193             public void insertDrools(Object object) {
194                 ((Runnable) object).run();
195             }
196         };
197
198         session.setPolicySession();
199
200         nactive = new AtomicInteger(0);
201         nsuccesses = new AtomicInteger(0);
202
203         cleanDb();
204
205         feature = new MyLockingFeature(true);
206     }
207
208     @After
209     public void tearDown() throws SQLException {
210         shutdownFeature();
211         cleanDb();
212     }
213
214     private void cleanDb() throws SQLException {
215         try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
216             stmt.executeUpdate();
217         }
218     }
219
220     private void shutdownFeature() {
221         if (feature != null) {
222             feature.afterStop(engine);
223             feature = null;
224         }
225     }
226
227     /**
228      * Tests that the feature is found in the expected service sets.
229      */
230     @Test
231     public void testServiceApis() {
232         assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
233                         .anyMatch(obj -> obj instanceof DistributedLockManager));
234     }
235
236     @Test
237     public void testGetSequenceNumber() {
238         assertEquals(1000, feature.getSequenceNumber());
239     }
240
241     @Test
242     public void testBeforeCreateLockManager() {
243         assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
244     }
245
246     /**
247      * Tests beforeCreate(), when getProperties() throws a runtime exception.
248      */
249     @Test
250     public void testBeforeCreateLockManagerEx() {
251         shutdownFeature();
252
253         feature = new MyLockingFeature(false) {
254             @Override
255             protected Properties getProperties(String fileName) {
256                 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
257             }
258         };
259
260         assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties()))
261                         .isInstanceOf(DistributedLockManagerException.class);
262     }
263
264     @Test
265     public void testAfterStart() {
266         // verify that cleanup & expire check are both added to the queue
267         verify(exsvc).execute(any());
268         verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
269     }
270
271     /**
272      * Tests afterStart(), when thread pool throws a runtime exception.
273      */
274     @Test
275     public void testAfterStartExInThreadPool() {
276         shutdownFeature();
277
278         feature = new MyLockingFeature(false);
279
280         doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
281
282         assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
283     }
284
285     @Test
286     public void testDeleteExpiredDbLocks() throws SQLException {
287         // add records: two expired, one not
288         insertRecord(RESOURCE, feature.getUuidString(), -1);
289         insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
290         insertRecord(RESOURCE3, OTHER_OWNER, 0);
291         insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
292
293         // get the clean-up function and execute it
294         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
295         verify(exsvc).execute(captor.capture());
296
297         long tbegin = System.currentTimeMillis();
298         Runnable action = captor.getValue();
299         action.run();
300
301         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
302         assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
303         assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
304         assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
305
306         assertEquals(2, getRecordCount());
307     }
308
309     /**
310      * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
311      *
312      * @throws SQLException if an error occurs
313      */
314     @Test
315     public void testDeleteExpiredDbLocksEx() throws SQLException {
316         feature = new InvalidDbLockingFeature(TRANSIENT);
317
318         // get the clean-up function and execute it
319         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
320         verify(exsvc).execute(captor.capture());
321
322         Runnable action = captor.getValue();
323
324         // should not throw an exception
325         action.run();
326     }
327
328     @Test
329     public void testAfterStop() {
330         shutdownFeature();
331         verify(checker).cancel(anyBoolean());
332
333         feature = new DistributedLockManager();
334
335         // shutdown without calling afterStart()
336
337         shutdownFeature();
338     }
339
340     /**
341      * Tests afterStop(), when the data source throws an exception when close() is called.
342      *
343      * @throws SQLException if an error occurs
344      */
345     @Test
346     public void testAfterStopEx() throws SQLException {
347         shutdownFeature();
348
349         // use a data source that throws an exception when closed
350         feature = new InvalidDbLockingFeature(TRANSIENT);
351
352         assertThatCode(() -> shutdownFeature()).doesNotThrowAnyException();
353     }
354
355     @Test
356     public void testCreateLock() throws SQLException {
357         verify(exsvc).execute(any());
358
359         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
360         assertTrue(lock.isWaiting());
361
362         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
363
364         // this lock should fail
365         LockCallback callback2 = mock(LockCallback.class);
366         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
367         assertTrue(lock2.isUnavailable());
368         verify(callback2, never()).lockAvailable(lock2);
369         verify(callback2).lockUnavailable(lock2);
370
371         // this should fail, too
372         LockCallback callback3 = mock(LockCallback.class);
373         DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
374         assertTrue(lock3.isUnavailable());
375         verify(callback3, never()).lockAvailable(lock3);
376         verify(callback3).lockUnavailable(lock3);
377
378         // no change to first
379         assertTrue(lock.isWaiting());
380
381         // no callbacks to the first lock
382         verify(callback, never()).lockAvailable(lock);
383         verify(callback, never()).lockUnavailable(lock);
384
385         assertTrue(lock.isWaiting());
386         assertEquals(0, getRecordCount());
387
388         runLock(0, 0);
389         assertTrue(lock.isActive());
390         assertEquals(1, getRecordCount());
391
392         verify(callback).lockAvailable(lock);
393         verify(callback, never()).lockUnavailable(lock);
394
395         // this should succeed
396         DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
397         assertTrue(lock4.isWaiting());
398
399         // after running checker, original records should still remain
400         runChecker(0, 0, EXPIRE_SEC);
401         assertEquals(1, getRecordCount());
402         verify(callback, never()).lockUnavailable(lock);
403     }
404
405     /**
406      * Tests createLock() when the feature is not the latest instance.
407      */
408     @Test
409     public void testCreateLockNotLatestInstance() {
410         DistributedLockManager.setLatestInstance(null);
411
412         Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
413         assertTrue(lock.isUnavailable());
414         verify(callback, never()).lockAvailable(any());
415         verify(callback).lockUnavailable(lock);
416     }
417
418     @Test
419     public void testCheckExpired() throws SQLException {
420         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
421         runLock(0, 0);
422
423         LockCallback callback2 = mock(LockCallback.class);
424         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
425         runLock(1, 0);
426
427         LockCallback callback3 = mock(LockCallback.class);
428         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
429         runLock(2, 0);
430
431         LockCallback callback4 = mock(LockCallback.class);
432         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
433         runLock(3, 0);
434
435         LockCallback callback5 = mock(LockCallback.class);
436         final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
437         runLock(4, 0);
438
439         assertEquals(5, getRecordCount());
440
441         // expire one record
442         updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
443
444         // change host of another record
445         updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
446
447         // change uuid of another record
448         updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC);
449
450
451         // run the checker
452         runChecker(0, 0, EXPIRE_SEC);
453
454
455         // check lock states
456         assertTrue(lock.isUnavailable());
457         assertTrue(lock2.isActive());
458         assertTrue(lock3.isUnavailable());
459         assertTrue(lock4.isActive());
460         assertTrue(lock5.isUnavailable());
461
462         // allow callbacks
463         runLock(2, 2);
464         runLock(3, 1);
465         runLock(4, 0);
466         verify(callback).lockUnavailable(lock);
467         verify(callback3).lockUnavailable(lock3);
468         verify(callback5).lockUnavailable(lock5);
469
470         verify(callback2, never()).lockUnavailable(lock2);
471         verify(callback4, never()).lockUnavailable(lock4);
472
473
474         // another check should have been scheduled, with the normal interval
475         runChecker(1, 0, EXPIRE_SEC);
476     }
477
478     /**
479      * Tests checkExpired(), when schedule() throws an exception.
480      */
481     @Test
482     public void testCheckExpiredExecRejected() {
483         // arrange for execution to be rejected
484         when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
485                         .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
486
487         runChecker(0, 0, EXPIRE_SEC);
488     }
489
490     /**
491      * Tests checkExpired(), when getConnection() throws an exception.
492      */
493     @Test
494     public void testCheckExpiredSqlEx() {
495         // use a data source that throws an exception when getConnection() is called
496         feature = new InvalidDbLockingFeature(TRANSIENT);
497
498         runChecker(0, 0, EXPIRE_SEC);
499
500         // it should have scheduled another check, sooner
501         runChecker(0, 0, RETRY_SEC);
502     }
503
504     /**
505      * Tests checkExpired(), when getConnection() throws an exception and the feature is
506      * no longer alive.
507      */
508     @Test
509     public void testCheckExpiredSqlExFeatureStopped() {
510         // use a data source that throws an exception when getConnection() is called
511         feature = new InvalidDbLockingFeature(TRANSIENT) {
512             @Override
513             protected SQLException makeEx() {
514                 this.stop();
515                 return super.makeEx();
516             }
517         };
518
519         runChecker(0, 0, EXPIRE_SEC);
520
521         // it should NOT have scheduled another check
522         verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
523     }
524
525     @Test
526     public void testExpireLocks() throws SQLException {
527         AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
528
529         feature = new MyLockingFeature(true) {
530             @Override
531             protected BasicDataSource makeDataSource() throws Exception {
532                 // get the real data source
533                 BasicDataSource src2 = super.makeDataSource();
534
535                 when(datasrc.getConnection()).thenAnswer(answer -> {
536                     DistributedLock lck = freeLock.getAndSet(null);
537                     if (lck != null) {
538                         // free it
539                         lck.free();
540
541                         // run its doUnlock
542                         runLock(4, 0);
543                     }
544
545                     return src2.getConnection();
546                 });
547
548                 return datasrc;
549             }
550         };
551
552         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
553         runLock(0, 0);
554
555         LockCallback callback2 = mock(LockCallback.class);
556         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
557         runLock(1, 0);
558
559         LockCallback callback3 = mock(LockCallback.class);
560         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
561         // don't run doLock for lock3 - leave it in the waiting state
562
563         LockCallback callback4 = mock(LockCallback.class);
564         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
565         runLock(3, 0);
566
567         assertEquals(3, getRecordCount());
568
569         // expire one record
570         updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
571
572         // arrange to free lock4 while the checker is running
573         freeLock.set(lock4);
574
575         // run the checker
576         runChecker(0, 0, EXPIRE_SEC);
577
578
579         // check lock states
580         assertTrue(lock.isUnavailable());
581         assertTrue(lock2.isActive());
582         assertTrue(lock3.isWaiting());
583         assertTrue(lock4.isUnavailable());
584
585         runLock(4, 0);
586         verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
587
588         verify(callback).lockUnavailable(lock);
589         verify(callback2, never()).lockUnavailable(lock2);
590         verify(callback3, never()).lockUnavailable(lock3);
591         verify(callback4, never()).lockUnavailable(lock4);
592     }
593
594     @Test
595     public void testDistributedLockNoArgs() {
596         DistributedLock lock = new DistributedLock();
597         assertNull(lock.getResourceId());
598         assertNull(lock.getOwnerKey());
599         assertNull(lock.getCallback());
600         assertEquals(0, lock.getHoldSec());
601     }
602
603     @Test
604     public void testDistributedLock() {
605         assertThatIllegalArgumentException()
606                         .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
607                         .withMessageContaining("holdSec");
608
609         // should generate no exception
610         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
611     }
612
613     @Test
614     public void testDistributedLockSerializable() throws Exception {
615         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
616         lock = roundTrip(lock);
617
618         assertTrue(lock.isWaiting());
619
620         assertEquals(RESOURCE, lock.getResourceId());
621         assertEquals(OWNER_KEY, lock.getOwnerKey());
622         assertNull(lock.getCallback());
623         assertEquals(HOLD_SEC, lock.getHoldSec());
624     }
625
626     @Test
627     public void testGrant() {
628         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
629         assertFalse(lock.isActive());
630
631         // execute the doLock() call
632         runLock(0, 0);
633
634         assertTrue(lock.isActive());
635
636         // the callback for the lock should have been run in the foreground thread
637         verify(callback).lockAvailable(lock);
638     }
639
640     @Test
641     public void testDistributedLockDeny() {
642         // get a lock
643         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
644
645         // get another lock - should fail
646         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
647
648         assertTrue(lock.isUnavailable());
649
650         // the callback for the second lock should have been run in the foreground thread
651         verify(callback).lockUnavailable(lock);
652
653         // should only have a request for the first lock
654         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
655     }
656
657     @Test
658     public void testDistributedLockFree() {
659         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
660
661         assertTrue(lock.free());
662         assertTrue(lock.isUnavailable());
663
664         // run both requests associated with the lock
665         runLock(0, 1);
666         runLock(1, 0);
667
668         // should not have changed state
669         assertTrue(lock.isUnavailable());
670
671         // attempt to free it again
672         assertFalse(lock.free());
673
674         // should not have queued anything else
675         verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
676
677         // new lock should succeed
678         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
679         assertTrue(lock2 != lock);
680         assertTrue(lock2.isWaiting());
681     }
682
683     /**
684      * Tests that free() works on a serialized lock with a new feature.
685      *
686      * @throws Exception if an error occurs
687      */
688     @Test
689     public void testDistributedLockFreeSerialized() throws Exception {
690         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
691
692         feature = new MyLockingFeature(true);
693
694         lock = roundTrip(lock);
695         assertTrue(lock.free());
696         assertTrue(lock.isUnavailable());
697     }
698
699     /**
700      * Tests free() on a serialized lock without a feature.
701      *
702      * @throws Exception if an error occurs
703      */
704     @Test
705     public void testDistributedLockFreeNoFeature() throws Exception {
706         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
707
708         DistributedLockManager.setLatestInstance(null);
709
710         lock = roundTrip(lock);
711         assertFalse(lock.free());
712         assertTrue(lock.isUnavailable());
713     }
714
715     /**
716      * Tests the case where the lock is freed and doUnlock called between the call to
717      * isUnavailable() and the call to compute().
718      */
719     @Test
720     public void testDistributedLockFreeUnlocked() {
721         feature = new FreeWithFreeLockingFeature(true);
722
723         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
724
725         assertFalse(lock.free());
726         assertTrue(lock.isUnavailable());
727     }
728
729     /**
730      * Tests the case where the lock is freed, but doUnlock is not completed, between the
731      * call to isUnavailable() and the call to compute().
732      */
733     @Test
734     public void testDistributedLockFreeLockFreed() {
735         feature = new FreeWithFreeLockingFeature(false);
736
737         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
738
739         assertFalse(lock.free());
740         assertTrue(lock.isUnavailable());
741     }
742
743     @Test
744     public void testDistributedLockExtend() {
745         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
746
747         // lock2 should be denied - called back by this thread
748         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
749         verify(callback, never()).lockAvailable(lock2);
750         verify(callback).lockUnavailable(lock2);
751
752         // lock2 will still be denied - called back by this thread
753         lock2.extend(HOLD_SEC, callback);
754         verify(callback, times(2)).lockUnavailable(lock2);
755
756         // force lock2 to be active - should still be denied
757         Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
758         lock2.extend(HOLD_SEC, callback);
759         verify(callback, times(3)).lockUnavailable(lock2);
760
761         assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
762                         .withMessageContaining("holdSec");
763
764         // execute doLock()
765         runLock(0, 0);
766         assertTrue(lock.isActive());
767
768         // now extend the first lock
769         LockCallback callback2 = mock(LockCallback.class);
770         lock.extend(HOLD_SEC2, callback2);
771         assertTrue(lock.isWaiting());
772
773         // execute doExtend()
774         runLock(1, 0);
775         lock.extend(HOLD_SEC2, callback2);
776         assertEquals(HOLD_SEC2, lock.getHoldSec());
777         verify(callback2).lockAvailable(lock);
778         verify(callback2, never()).lockUnavailable(lock);
779     }
780
781     /**
782      * Tests that extend() works on a serialized lock with a new feature.
783      *
784      * @throws Exception if an error occurs
785      */
786     @Test
787     public void testDistributedLockExtendSerialized() throws Exception {
788         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
789
790         // run doLock
791         runLock(0, 0);
792         assertTrue(lock.isActive());
793
794         feature = new MyLockingFeature(true);
795
796         lock = roundTrip(lock);
797         assertTrue(lock.isActive());
798
799         LockCallback scallback = mock(LockCallback.class);
800
801         lock.extend(HOLD_SEC, scallback);
802         assertTrue(lock.isWaiting());
803
804         // run doExtend (in new feature)
805         runLock(0, 0);
806         assertTrue(lock.isActive());
807
808         verify(scallback).lockAvailable(lock);
809         verify(scallback, never()).lockUnavailable(lock);
810     }
811
812     /**
813      * Tests extend() on a serialized lock without a feature.
814      *
815      * @throws Exception if an error occurs
816      */
817     @Test
818     public void testDistributedLockExtendNoFeature() throws Exception {
819         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
820
821         // run doLock
822         runLock(0, 0);
823         assertTrue(lock.isActive());
824
825         DistributedLockManager.setLatestInstance(null);
826
827         lock = roundTrip(lock);
828         assertTrue(lock.isActive());
829
830         LockCallback scallback = mock(LockCallback.class);
831
832         lock.extend(HOLD_SEC, scallback);
833         assertTrue(lock.isUnavailable());
834
835         verify(scallback, never()).lockAvailable(lock);
836         verify(scallback).lockUnavailable(lock);
837     }
838
839     /**
840      * Tests the case where the lock is freed and doUnlock called between the call to
841      * isUnavailable() and the call to compute().
842      */
843     @Test
844     public void testDistributedLockExtendUnlocked() {
845         feature = new FreeWithFreeLockingFeature(true);
846
847         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
848
849         lock.extend(HOLD_SEC2, callback);
850         assertTrue(lock.isUnavailable());
851         verify(callback).lockUnavailable(lock);
852     }
853
854     /**
855      * Tests the case where the lock is freed, but doUnlock is not completed, between the
856      * call to isUnavailable() and the call to compute().
857      */
858     @Test
859     public void testDistributedLockExtendLockFreed() {
860         feature = new FreeWithFreeLockingFeature(false);
861
862         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
863
864         lock.extend(HOLD_SEC2, callback);
865         assertTrue(lock.isUnavailable());
866         verify(callback).lockUnavailable(lock);
867     }
868
869     @Test
870     public void testDistributedLockScheduleRequest() {
871         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
872         runLock(0, 0);
873
874         verify(callback).lockAvailable(lock);
875     }
876
877     @Test
878     public void testDistributedLockRescheduleRequest() throws SQLException {
879         // use a data source that throws an exception when getConnection() is called
880         InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
881         feature = invfeat;
882
883         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
884
885         // invoke doLock - should fail and reschedule
886         runLock(0, 0);
887
888         // should still be waiting
889         assertTrue(lock.isWaiting());
890         verify(callback, never()).lockUnavailable(lock);
891
892         // free the lock while doLock is executing
893         invfeat.freeLock = true;
894
895         // try scheduled request - should just invoke doUnlock
896         runSchedule(0, 0);
897
898         // should still be waiting
899         assertTrue(lock.isUnavailable());
900         verify(callback, never()).lockUnavailable(lock);
901
902         // should have scheduled a retry of doUnlock
903         verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
904     }
905
906     @Test
907     public void testDistributedLockGetNextRequest() {
908         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
909
910         /*
911          * run doLock. This should cause getNextRequest() to be called twice, once with a
912          * request in the queue, and the second time with request=null.
913          */
914         runLock(0, 0);
915     }
916
917     /**
918      * Tests getNextRequest(), where the same request is still in the queue the second
919      * time it's called.
920      */
921     @Test
922     public void testDistributedLockGetNextRequestSameRequest() {
923         // force reschedule to be invoked
924         feature = new InvalidDbLockingFeature(TRANSIENT);
925
926         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
927
928         /*
929          * run doLock. This should cause getNextRequest() to be called twice, once with a
930          * request in the queue, and the second time with the same request again.
931          */
932         runLock(0, 0);
933
934         verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
935     }
936
937     @Test
938     public void testDistributedLockDoRequest() throws SQLException {
939         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
940
941         assertTrue(lock.isWaiting());
942
943         // run doLock via doRequest
944         runLock(0, 0);
945
946         assertTrue(lock.isActive());
947     }
948
949     /**
950      * Tests doRequest(), when doRequest() is already running within another thread.
951      */
952     @Test
953     public void testDistributedLockDoRequestBusy() {
954         /*
955          * this feature will invoke a request in a background thread while it's being run
956          * in a foreground thread.
957          */
958         AtomicBoolean running = new AtomicBoolean(false);
959         AtomicBoolean returned = new AtomicBoolean(false);
960
961         feature = new MyLockingFeature(true) {
962             @Override
963             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
964                             LockCallback callback) {
965                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
966                     private static final long serialVersionUID = 1L;
967
968                     @Override
969                     protected boolean doDbInsert(Connection conn) throws SQLException {
970                         if (running.get()) {
971                             // already inside the thread - don't recurse any further
972                             return super.doDbInsert(conn);
973                         }
974
975                         running.set(true);
976
977                         Thread thread = new Thread(() -> {
978                             // run doLock from within the new thread
979                             runLock(0, 0);
980                         });
981                         thread.setDaemon(true);
982                         thread.start();
983
984                         // wait for the background thread to complete before continuing
985                         try {
986                             thread.join(5000);
987                         } catch (InterruptedException ignore) {
988                             Thread.currentThread().interrupt();
989                         }
990
991                         returned.set(!thread.isAlive());
992
993                         return super.doDbInsert(conn);
994                     }
995                 };
996             }
997         };
998
999         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1000
1001         // run doLock
1002         runLock(0, 0);
1003
1004         assertTrue(returned.get());
1005     }
1006
1007     /**
1008      * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1009      *
1010      * @throws SQLException if an error occurs
1011      */
1012     @Test
1013     public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1014         // throw run-time exception
1015         when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1016
1017         // use a data source that throws an exception when getConnection() is called
1018         feature = new MyLockingFeature(true) {
1019             @Override
1020             protected BasicDataSource makeDataSource() throws Exception {
1021                 return datasrc;
1022             }
1023         };
1024
1025         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1026
1027         // invoke doLock - should NOT reschedule
1028         runLock(0, 0);
1029
1030         assertTrue(lock.isUnavailable());
1031         verify(callback).lockUnavailable(lock);
1032
1033         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1034     }
1035
1036     /**
1037      * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1038      * state.
1039      *
1040      * @throws SQLException if an error occurs
1041      */
1042     @Test
1043     public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1044         // throw run-time exception
1045         when(datasrc.getConnection()).thenAnswer(answer -> {
1046             lock.free();
1047             throw new IllegalStateException(EXPECTED_EXCEPTION);
1048         });
1049
1050         // use a data source that throws an exception when getConnection() is called
1051         feature = new MyLockingFeature(true) {
1052             @Override
1053             protected BasicDataSource makeDataSource() throws Exception {
1054                 return datasrc;
1055             }
1056         };
1057
1058         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1059
1060         // invoke doLock - should NOT reschedule
1061         runLock(0, 0);
1062
1063         assertTrue(lock.isUnavailable());
1064         verify(callback, never()).lockUnavailable(lock);
1065
1066         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1067     }
1068
1069     /**
1070      * Tests doRequest() when the retry count gets exhausted.
1071      */
1072     @Test
1073     public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1074         // use a data source that throws an exception when getConnection() is called
1075         feature = new InvalidDbLockingFeature(TRANSIENT);
1076
1077         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1078
1079         // invoke doLock - should fail and reschedule
1080         runLock(0, 0);
1081
1082         // should still be waiting
1083         assertTrue(lock.isWaiting());
1084         verify(callback, never()).lockUnavailable(lock);
1085
1086         // try again, via SCHEDULER - first retry fails
1087         runSchedule(0, 0);
1088
1089         // should still be waiting
1090         assertTrue(lock.isWaiting());
1091         verify(callback, never()).lockUnavailable(lock);
1092
1093         // try again, via SCHEDULER - final retry fails
1094         runSchedule(1, 0);
1095         assertTrue(lock.isUnavailable());
1096
1097         // now callback should have been called
1098         verify(callback).lockUnavailable(lock);
1099     }
1100
1101     /**
1102      * Tests doRequest() when a non-transient DB exception is thrown.
1103      */
1104     @Test
1105     public void testDistributedLockDoRequestNotTransient() {
1106         /*
1107          * use a data source that throws a PERMANENT exception when getConnection() is
1108          * called
1109          */
1110         feature = new InvalidDbLockingFeature(PERMANENT);
1111
1112         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1113
1114         // invoke doLock - should fail
1115         runLock(0, 0);
1116
1117         assertTrue(lock.isUnavailable());
1118         verify(callback).lockUnavailable(lock);
1119
1120         // should not have scheduled anything new
1121         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1122         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1123     }
1124
1125     @Test
1126     public void testDistributedLockDoLock() throws SQLException {
1127         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1128
1129         // invoke doLock - should simply do an insert
1130         long tbegin = System.currentTimeMillis();
1131         runLock(0, 0);
1132
1133         assertEquals(1, getRecordCount());
1134         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1135         verify(callback).lockAvailable(lock);
1136     }
1137
1138     /**
1139      * Tests doLock() when the lock is freed before doLock runs.
1140      *
1141      * @throws SQLException if an error occurs
1142      */
1143     @Test
1144     public void testDistributedLockDoLockFreed() throws SQLException {
1145         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1146
1147         lock.setState(LockState.UNAVAILABLE);
1148
1149         // invoke doLock - should do nothing
1150         runLock(0, 0);
1151
1152         assertEquals(0, getRecordCount());
1153
1154         verify(callback, never()).lockAvailable(lock);
1155     }
1156
1157     /**
1158      * Tests doLock() when a DB exception is thrown.
1159      */
1160     @Test
1161     public void testDistributedLockDoLockEx() {
1162         // use a data source that throws an exception when getConnection() is called
1163         feature = new InvalidDbLockingFeature(PERMANENT);
1164
1165         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1166
1167         // invoke doLock - should simply do an insert
1168         runLock(0, 0);
1169
1170         // lock should have failed due to exception
1171         verify(callback).lockUnavailable(lock);
1172     }
1173
1174     /**
1175      * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1176      * to be called.
1177      */
1178     @Test
1179     public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1180         // insert an expired record
1181         insertRecord(RESOURCE, feature.getUuidString(), 0);
1182
1183         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1184
1185         // invoke doLock - should simply do an update
1186         runLock(0, 0);
1187         verify(callback).lockAvailable(lock);
1188     }
1189
1190     /**
1191      * Tests doLock() when a locked record already exists.
1192      */
1193     @Test
1194     public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1195         // insert an expired record
1196         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1197
1198         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1199
1200         // invoke doLock
1201         runLock(0, 0);
1202
1203         // lock should have failed because it's already locked
1204         verify(callback).lockUnavailable(lock);
1205     }
1206
1207     @Test
1208     public void testDistributedLockDoUnlock() throws SQLException {
1209         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1210
1211         // invoke doLock()
1212         runLock(0, 0);
1213
1214         lock.free();
1215
1216         // invoke doUnlock()
1217         long tbegin = System.currentTimeMillis();
1218         runLock(1, 0);
1219
1220         assertEquals(0, getRecordCount());
1221         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1222
1223         assertTrue(lock.isUnavailable());
1224
1225         // no more callbacks should have occurred
1226         verify(callback, times(1)).lockAvailable(lock);
1227         verify(callback, never()).lockUnavailable(lock);
1228     }
1229
1230     /**
1231      * Tests doUnlock() when a DB exception is thrown.
1232      *
1233      * @throws SQLException if an error occurs
1234      */
1235     @Test
1236     public void testDistributedLockDoUnlockEx() throws SQLException {
1237         feature = new InvalidDbLockingFeature(PERMANENT);
1238
1239         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1240
1241         // do NOT invoke doLock() - it will fail without a DB connection
1242
1243         lock.free();
1244
1245         // invoke doUnlock()
1246         runLock(1, 0);
1247
1248         assertTrue(lock.isUnavailable());
1249
1250         // no more callbacks should have occurred
1251         verify(callback, never()).lockAvailable(lock);
1252         verify(callback, never()).lockUnavailable(lock);
1253     }
1254
1255     @Test
1256     public void testDistributedLockDoExtend() throws SQLException {
1257         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1258         runLock(0, 0);
1259
1260         LockCallback callback2 = mock(LockCallback.class);
1261         lock.extend(HOLD_SEC2, callback2);
1262
1263         // call doExtend()
1264         long tbegin = System.currentTimeMillis();
1265         runLock(1, 0);
1266
1267         assertEquals(1, getRecordCount());
1268         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1269
1270         assertTrue(lock.isActive());
1271
1272         // no more callbacks should have occurred
1273         verify(callback).lockAvailable(lock);
1274         verify(callback, never()).lockUnavailable(lock);
1275
1276         // extension should have succeeded
1277         verify(callback2).lockAvailable(lock);
1278         verify(callback2, never()).lockUnavailable(lock);
1279     }
1280
1281     /**
1282      * Tests doExtend() when the lock is freed before doExtend runs.
1283      *
1284      * @throws SQLException if an error occurs
1285      */
1286     @Test
1287     public void testDistributedLockDoExtendFreed() throws SQLException {
1288         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1289         lock.extend(HOLD_SEC2, callback);
1290
1291         lock.setState(LockState.UNAVAILABLE);
1292
1293         // invoke doExtend - should do nothing
1294         runLock(1, 0);
1295
1296         assertEquals(0, getRecordCount());
1297
1298         verify(callback, never()).lockAvailable(lock);
1299     }
1300
1301     /**
1302      * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1303      * insert.
1304      *
1305      * @throws SQLException if an error occurs
1306      */
1307     @Test
1308     public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1309         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1310         runLock(0, 0);
1311
1312         LockCallback callback2 = mock(LockCallback.class);
1313         lock.extend(HOLD_SEC2, callback2);
1314
1315         // delete the record so it's forced to re-insert it
1316         cleanDb();
1317
1318         // call doExtend()
1319         long tbegin = System.currentTimeMillis();
1320         runLock(1, 0);
1321
1322         assertEquals(1, getRecordCount());
1323         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1324
1325         assertTrue(lock.isActive());
1326
1327         // no more callbacks should have occurred
1328         verify(callback).lockAvailable(lock);
1329         verify(callback, never()).lockUnavailable(lock);
1330
1331         // extension should have succeeded
1332         verify(callback2).lockAvailable(lock);
1333         verify(callback2, never()).lockUnavailable(lock);
1334     }
1335
1336     /**
1337      * Tests doExtend() when both update and insert fail.
1338      *
1339      * @throws SQLException if an error occurs
1340      */
1341     @Test
1342     public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
1343         /*
1344          * this feature will create a lock that returns false when doDbUpdate() is
1345          * invoked, or when doDbInsert() is invoked a second time
1346          */
1347         feature = new MyLockingFeature(true) {
1348             @Override
1349             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1350                             LockCallback callback) {
1351                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1352                     private static final long serialVersionUID = 1L;
1353                     private int ntimes = 0;
1354
1355                     @Override
1356                     protected boolean doDbInsert(Connection conn) throws SQLException {
1357                         if (ntimes++ > 0) {
1358                             return false;
1359                         }
1360
1361                         return super.doDbInsert(conn);
1362                     }
1363
1364                     @Override
1365                     protected boolean doDbUpdate(Connection conn) {
1366                         return false;
1367                     }
1368                 };
1369             }
1370         };
1371
1372         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1373         runLock(0, 0);
1374
1375         LockCallback callback2 = mock(LockCallback.class);
1376         lock.extend(HOLD_SEC2, callback2);
1377
1378         // call doExtend()
1379         runLock(1, 0);
1380
1381         assertTrue(lock.isUnavailable());
1382
1383         // no more callbacks should have occurred
1384         verify(callback).lockAvailable(lock);
1385         verify(callback, never()).lockUnavailable(lock);
1386
1387         // extension should have failed
1388         verify(callback2, never()).lockAvailable(lock);
1389         verify(callback2).lockUnavailable(lock);
1390     }
1391
1392     /**
1393      * Tests doExtend() when an exception occurs.
1394      *
1395      * @throws SQLException if an error occurs
1396      */
1397     @Test
1398     public void testDistributedLockDoExtendEx() throws SQLException {
1399         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1400         runLock(0, 0);
1401
1402         /*
1403          * delete the record and insert one with a different owner, which will cause
1404          * doDbInsert() to throw an exception
1405          */
1406         cleanDb();
1407         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1408
1409         LockCallback callback2 = mock(LockCallback.class);
1410         lock.extend(HOLD_SEC2, callback2);
1411
1412         // call doExtend()
1413         runLock(1, 0);
1414
1415         assertTrue(lock.isUnavailable());
1416
1417         // no more callbacks should have occurred
1418         verify(callback).lockAvailable(lock);
1419         verify(callback, never()).lockUnavailable(lock);
1420
1421         // extension should have failed
1422         verify(callback2, never()).lockAvailable(lock);
1423         verify(callback2).lockUnavailable(lock);
1424     }
1425
1426     @Test
1427     public void testDistributedLockToString() {
1428         String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
1429         assertNotNull(text);
1430         assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1431     }
1432
1433     @Test
1434     public void testMakeThreadPool() {
1435         // use a REAL feature to test this
1436         feature = new DistributedLockManager();
1437
1438         // this should create a thread pool
1439         feature.beforeCreateLockManager(engine, new Properties());
1440         feature.afterStart(engine);
1441
1442         assertThatCode(() -> shutdownFeature()).doesNotThrowAnyException();
1443     }
1444
1445     /**
1446      * Performs a multi-threaded test of the locking facility.
1447      *
1448      * @throws InterruptedException if the current thread is interrupted while waiting for
1449      *         the background threads to complete
1450      */
1451     @Test
1452     public void testMultiThreaded() throws InterruptedException {
1453         Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1454
1455         feature = new DistributedLockManager();
1456         feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1457         feature.afterStart(PolicyEngineConstants.getManager());
1458
1459         List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1460         for (int x = 0; x < MAX_THREADS; ++x) {
1461             threads.add(new MyThread());
1462         }
1463
1464         threads.forEach(Thread::start);
1465
1466         for (MyThread thread : threads) {
1467             thread.join(6000);
1468             assertFalse(thread.isAlive());
1469         }
1470
1471         for (MyThread thread : threads) {
1472             if (thread.err != null) {
1473                 throw thread.err;
1474             }
1475         }
1476
1477         assertTrue(nsuccesses.get() > 0);
1478     }
1479
1480     private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
1481                     boolean waitForLock) {
1482         return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
1483     }
1484
1485     private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1486         ByteArrayOutputStream baos = new ByteArrayOutputStream();
1487         try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1488             oos.writeObject(lock);
1489         }
1490
1491         ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1492         try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1493             return (DistributedLock) ois.readObject();
1494         }
1495     }
1496
1497     /**
1498      * Runs the checkExpired() action.
1499      *
1500      * @param nskip number of actions in the work queue to skip
1501      * @param nadditional number of additional actions that appear in the work queue
1502      *        <i>after</i> the checkExpired action
1503      * @param schedSec number of seconds for which the checker should have been scheduled
1504      */
1505     private void runChecker(int nskip, int nadditional, long schedSec) {
1506         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1507         verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1508         Runnable action = captor.getAllValues().get(nskip);
1509         action.run();
1510     }
1511
1512     /**
1513      * Runs a lock action (e.g., doLock, doUnlock).
1514      *
1515      * @param nskip number of actions in the work queue to skip
1516      * @param nadditional number of additional actions that appear in the work queue
1517      *        <i>after</i> the desired action
1518      */
1519     void runLock(int nskip, int nadditional) {
1520         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1521         verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1522
1523         Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1524         action.run();
1525     }
1526
1527     /**
1528      * Runs a scheduled action (e.g., "retry" action).
1529      *
1530      * @param nskip number of actions in the work queue to skip
1531      * @param nadditional number of additional actions that appear in the work queue
1532      *        <i>after</i> the desired action
1533      */
1534     void runSchedule(int nskip, int nadditional) {
1535         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1536         verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
1537
1538         Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1539         action.run();
1540     }
1541
1542     /**
1543      * Gets a count of the number of lock records in the DB.
1544      *
1545      * @return the number of lock records in the DB
1546      * @throws SQLException if an error occurs accessing the DB
1547      */
1548     private int getRecordCount() throws SQLException {
1549         try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1550                         ResultSet result = stmt.executeQuery()) {
1551
1552             if (result.next()) {
1553                 return result.getInt(1);
1554
1555             } else {
1556                 return 0;
1557             }
1558         }
1559     }
1560
1561     /**
1562      * Determines if there is a record for the given resource whose expiration time is in
1563      * the expected range.
1564      *
1565      * @param resourceId ID of the resource of interest
1566      * @param uuidString UUID string of the owner
1567      * @param holdSec seconds for which the lock was to be held
1568      * @param tbegin earliest time, in milliseconds, at which the record could have been
1569      *        inserted into the DB
1570      * @return {@code true} if a record is found, {@code false} otherwise
1571      * @throws SQLException if an error occurs accessing the DB
1572      */
1573     private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1574         try (PreparedStatement stmt =
1575                         conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1576                                         + " WHERE resourceId=? AND host=? AND owner=?")) {
1577
1578             stmt.setString(1, resourceId);
1579             stmt.setString(2, feature.getHostName());
1580             stmt.setString(3, uuidString);
1581
1582             try (ResultSet result = stmt.executeQuery()) {
1583                 if (result.next()) {
1584                     int remaining = result.getInt(1);
1585                     long maxDiff = System.currentTimeMillis() - tbegin;
1586                     return (remaining >= 0 && holdSec - remaining <= maxDiff);
1587
1588                 } else {
1589                     return false;
1590                 }
1591             }
1592         }
1593     }
1594
1595     /**
1596      * Inserts a record into the DB.
1597      *
1598      * @param resourceId ID of the resource of interest
1599      * @param uuidString UUID string of the owner
1600      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1601      * @throws SQLException if an error occurs accessing the DB
1602      */
1603     private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1604         this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset);
1605     }
1606
1607     private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1608                     throws SQLException {
1609         try (PreparedStatement stmt =
1610                         conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1611                                         + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1612
1613             stmt.setString(1, resourceId);
1614             stmt.setString(2, hostName);
1615             stmt.setString(3, uuidString);
1616             stmt.setInt(4, expireOffset);
1617
1618             assertEquals(1, stmt.executeUpdate());
1619         }
1620     }
1621
1622     /**
1623      * Updates a record in the DB.
1624      *
1625      * @param resourceId ID of the resource of interest
1626      * @param newUuid UUID string of the <i>new</i> owner
1627      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1628      * @throws SQLException if an error occurs accessing the DB
1629      */
1630     private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1631         try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1632                         + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1633
1634             stmt.setString(1, newHost);
1635             stmt.setString(2, newUuid);
1636             stmt.setInt(3, expireOffset);
1637             stmt.setString(4, resourceId);
1638
1639             assertEquals(1, stmt.executeUpdate());
1640         }
1641     }
1642
1643     /**
1644      * Feature that uses <i>exsvc</i> to execute requests.
1645      */
1646     private class MyLockingFeature extends DistributedLockManager {
1647
1648         public MyLockingFeature(boolean init) {
1649             shutdownFeature();
1650
1651             exsvc = mock(ScheduledExecutorService.class);
1652             when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1653             Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1654
1655             if (init) {
1656                 beforeCreateLockManager(engine, new Properties());
1657                 start();
1658                 afterStart(engine);
1659             }
1660         }
1661     }
1662
1663     /**
1664      * Feature whose data source all throws exceptions.
1665      */
1666     private class InvalidDbLockingFeature extends MyLockingFeature {
1667         private boolean isTransient;
1668         private boolean freeLock = false;
1669
1670         public InvalidDbLockingFeature(boolean isTransient) {
1671             // pass "false" because we have to set the error code BEFORE calling
1672             // afterStart()
1673             super(false);
1674
1675             this.isTransient = isTransient;
1676
1677             this.beforeCreateLockManager(engine, new Properties());
1678             this.start();
1679             this.afterStart(engine);
1680         }
1681
1682         @Override
1683         protected BasicDataSource makeDataSource() throws Exception {
1684             when(datasrc.getConnection()).thenAnswer(answer -> {
1685                 if (freeLock) {
1686                     freeLock = false;
1687                     lock.free();
1688                 }
1689
1690                 throw makeEx();
1691             });
1692
1693             doThrow(makeEx()).when(datasrc).close();
1694
1695             return datasrc;
1696         }
1697
1698         protected SQLException makeEx() {
1699             if (isTransient) {
1700                 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1701
1702             } else {
1703                 return new SQLException(EXPECTED_EXCEPTION);
1704             }
1705         }
1706     }
1707
1708     /**
1709      * Feature whose locks free themselves while free() is already running.
1710      */
1711     private class FreeWithFreeLockingFeature extends MyLockingFeature {
1712         private boolean relock;
1713
1714         public FreeWithFreeLockingFeature(boolean relock) {
1715             super(true);
1716             this.relock = relock;
1717         }
1718
1719         @Override
1720         protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1721                         LockCallback callback) {
1722
1723             return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1724                 private static final long serialVersionUID = 1L;
1725                 private boolean checked = false;
1726
1727                 @Override
1728                 public boolean isUnavailable() {
1729                     if (checked) {
1730                         return super.isUnavailable();
1731                     }
1732
1733                     checked = true;
1734
1735                     // release and relock
1736                     free();
1737
1738                     if (relock) {
1739                         // run doUnlock
1740                         runLock(1, 0);
1741
1742                         // relock it
1743                         createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1744                     }
1745
1746                     return false;
1747                 }
1748             };
1749         }
1750     }
1751
1752     /**
1753      * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
1754      * extend it, and then unlock it.
1755      */
1756     private class MyThread extends Thread {
1757         AssertionError err = null;
1758
1759         public MyThread() {
1760             setDaemon(true);
1761         }
1762
1763         @Override
1764         public void run() {
1765             try {
1766                 for (int x = 0; x < MAX_LOOPS; ++x) {
1767                     makeAttempt();
1768                 }
1769
1770             } catch (AssertionError e) {
1771                 err = e;
1772             }
1773         }
1774
1775         private void makeAttempt() {
1776             try {
1777                 Semaphore sem = new Semaphore(0);
1778
1779                 LockCallback cb = new LockCallback() {
1780                     @Override
1781                     public void lockAvailable(Lock lock) {
1782                         sem.release();
1783                     }
1784
1785                     @Override
1786                     public void lockUnavailable(Lock lock) {
1787                         sem.release();
1788                     }
1789                 };
1790
1791                 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1792
1793                 // wait for callback, whether available or unavailable
1794                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1795                 if (!lock.isActive()) {
1796                     return;
1797                 }
1798
1799                 nsuccesses.incrementAndGet();
1800
1801                 assertEquals(1, nactive.incrementAndGet());
1802
1803                 lock.extend(HOLD_SEC2, cb);
1804                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1805                 assertTrue(lock.isActive());
1806
1807                 // decrement BEFORE free()
1808                 nactive.decrementAndGet();
1809
1810                 assertTrue(lock.free());
1811                 assertTrue(lock.isUnavailable());
1812
1813             } catch (InterruptedException e) {
1814                 Thread.currentThread().interrupt();
1815                 throw new AssertionError("interrupted", e);
1816             }
1817         }
1818     }
1819 }