2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.distributed.locking;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
32 import static org.mockito.Matchers.any;
33 import static org.mockito.Matchers.anyLong;
34 import static org.mockito.Matchers.eq;
35 import static org.mockito.Mockito.doThrow;
36 import static org.mockito.Mockito.mock;
37 import static org.mockito.Mockito.never;
38 import static org.mockito.Mockito.times;
39 import static org.mockito.Mockito.verify;
40 import static org.mockito.Mockito.when;
42 import java.io.ByteArrayInputStream;
43 import java.io.ByteArrayOutputStream;
44 import java.io.ObjectInputStream;
45 import java.io.ObjectOutputStream;
46 import java.sql.Connection;
47 import java.sql.DriverManager;
48 import java.sql.PreparedStatement;
49 import java.sql.ResultSet;
50 import java.sql.SQLException;
51 import java.sql.SQLTransientException;
52 import java.util.ArrayList;
53 import java.util.List;
54 import java.util.Properties;
55 import java.util.concurrent.Executors;
56 import java.util.concurrent.RejectedExecutionException;
57 import java.util.concurrent.ScheduledExecutorService;
58 import java.util.concurrent.Semaphore;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.atomic.AtomicBoolean;
61 import java.util.concurrent.atomic.AtomicInteger;
62 import java.util.concurrent.atomic.AtomicReference;
63 import org.apache.commons.dbcp2.BasicDataSource;
64 import org.junit.After;
65 import org.junit.AfterClass;
66 import org.junit.Before;
67 import org.junit.BeforeClass;
68 import org.junit.Test;
69 import org.mockito.ArgumentCaptor;
70 import org.mockito.Mock;
71 import org.mockito.MockitoAnnotations;
72 import org.onap.policy.common.utils.services.OrderedServiceImpl;
73 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
74 import org.onap.policy.drools.core.lock.Lock;
75 import org.onap.policy.drools.core.lock.LockCallback;
76 import org.onap.policy.drools.core.lock.LockState;
77 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
78 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
79 import org.onap.policy.drools.system.PolicyEngine;
80 import org.onap.policy.drools.system.PolicyEngineConstants;
81 import org.powermock.reflect.Whitebox;
83 public class DistributedLockManagerTest {
84 private static final long EXPIRE_SEC = 900L;
85 private static final long RETRY_SEC = 60L;
86 private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
87 private static final String OTHER_HOST = "other-host";
88 private static final String OTHER_OWNER = "other-owner";
89 private static final String EXPECTED_EXCEPTION = "expected exception";
90 private static final String DB_CONNECTION =
91 "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
92 private static final String DB_USER = "user";
93 private static final String DB_PASSWORD = "password";
94 private static final String OWNER_KEY = "my key";
95 private static final String RESOURCE = "my resource";
96 private static final String RESOURCE2 = "my resource #2";
97 private static final String RESOURCE3 = "my resource #3";
98 private static final String RESOURCE4 = "my resource #4";
99 private static final String RESOURCE5 = "my resource #5";
100 private static final int HOLD_SEC = 100;
101 private static final int HOLD_SEC2 = 120;
102 private static final int MAX_THREADS = 5;
103 private static final int MAX_LOOPS = 100;
104 private static final boolean TRANSIENT = true;
105 private static final boolean PERMANENT = false;
107 // number of execute() calls before the first lock attempt
108 private static final int PRE_LOCK_EXECS = 1;
110 // number of execute() calls before the first schedule attempt
111 private static final int PRE_SCHED_EXECS = 1;
113 private static Connection conn = null;
114 private static ScheduledExecutorService saveExec;
115 private static ScheduledExecutorService realExec;
118 private ScheduledExecutorService exsvc;
121 private LockCallback callback;
124 private BasicDataSource datasrc;
127 private PolicyEngine engine;
129 private DistributedLock lock;
131 private AtomicInteger nactive;
132 private AtomicInteger nsuccesses;
133 private DistributedLockManager feature;
137 * Configures the location of the property files and creates the DB.
139 * @throws SQLException if the DB cannot be created
142 public static void setUpBeforeClass() throws SQLException {
143 SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
145 conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
147 try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
148 + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
149 + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
150 createStmt.executeUpdate();
153 saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
155 realExec = Executors.newScheduledThreadPool(3);
156 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
160 * Restores static fields.
163 public static void tearDownAfterClass() throws SQLException {
164 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
170 * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
173 * @throws SQLException if the lock records cannot be deleted from the DB
176 public void setUp() throws SQLException {
177 MockitoAnnotations.initMocks(this);
179 nactive = new AtomicInteger(0);
180 nsuccesses = new AtomicInteger(0);
184 when(engine.getExecutorService()).thenReturn(exsvc);
186 feature = new MyLockingFeature(true);
190 public void tearDown() throws SQLException {
195 private void cleanDb() throws SQLException {
196 try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
197 stmt.executeUpdate();
201 private void shutdownFeature() {
202 if (feature != null) {
203 feature.afterStop(engine);
209 * Tests that the feature is found in the expected service sets.
212 public void testServiceApis() {
213 assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
214 .anyMatch(obj -> obj instanceof DistributedLockManager));
218 * Tests constructor() when properties are invalid.
221 public void testDistributedLockManagerInvalidProperties() {
222 // use properties containing an invalid value
223 Properties props = new Properties();
224 props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "abc");
226 feature = new MyLockingFeature(false) {
228 protected Properties getProperties(String fileName) {
233 assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
237 public void testGetSequenceNumber() {
238 assertEquals(1000, feature.getSequenceNumber());
242 public void testStartableApi() {
243 assertTrue(feature.isAlive());
244 assertTrue(feature.start());
245 assertTrue(feature.stop());
248 // above should have had no effect
249 assertTrue(feature.isAlive());
251 feature.afterStop(engine);
252 assertFalse(feature.isAlive());
256 public void testLockApi() {
257 assertFalse(feature.isLocked());
258 assertTrue(feature.lock());
259 assertTrue(feature.unlock());
263 public void testBeforeCreateLockManager() {
264 assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
268 * Tests beforeCreate(), when getProperties() throws a runtime exception.
271 public void testBeforeCreateLockManagerEx() {
274 feature = new MyLockingFeature(false) {
276 protected Properties getProperties(String fileName) {
277 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
281 assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties()))
282 .isInstanceOf(DistributedLockManagerException.class);
286 public void testAfterStart() {
287 // verify that cleanup & expire check are both added to the queue
288 verify(exsvc).execute(any());
289 verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
293 * Tests afterStart(), when thread pool throws a runtime exception.
296 public void testAfterStartExInThreadPool() {
299 feature = new MyLockingFeature(false);
301 when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
302 .thenThrow(new IllegalArgumentException(EXPECTED_EXCEPTION));
304 assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
308 public void testDeleteExpiredDbLocks() throws SQLException {
309 // add records: two expired, one not
310 insertRecord(RESOURCE, feature.getUuidString(), -1);
311 insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
312 insertRecord(RESOURCE3, OTHER_OWNER, 0);
313 insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
315 // get the clean-up function and execute it
316 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
317 verify(exsvc).execute(captor.capture());
319 long tbegin = System.currentTimeMillis();
320 Runnable action = captor.getValue();
323 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
324 assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
325 assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
326 assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
328 assertEquals(2, getRecordCount());
332 * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
334 * @throws SQLException if an error occurs
337 public void testDeleteExpiredDbLocksEx() throws SQLException {
338 feature = new InvalidDbLockingFeature(TRANSIENT);
340 // get the clean-up function and execute it
341 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
342 verify(exsvc).execute(captor.capture());
344 Runnable action = captor.getValue();
346 // should not throw an exception
351 public void testAfterStop() {
354 feature = new DistributedLockManager();
356 // shutdown without calling afterStart()
362 * Tests afterStop(), when the data source throws an exception when close() is called.
364 * @throws SQLException if an error occurs
367 public void testAfterStopEx() throws SQLException {
370 // use a data source that throws an exception when closed
371 feature = new InvalidDbLockingFeature(TRANSIENT);
377 public void testCreateLock() throws SQLException {
378 verify(exsvc).execute(any());
380 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
381 assertTrue(lock.isWaiting());
383 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
385 // this lock should fail
386 LockCallback callback2 = mock(LockCallback.class);
387 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
388 assertTrue(lock2.isUnavailable());
389 verify(callback2, never()).lockAvailable(lock2);
390 verify(callback2).lockUnavailable(lock2);
392 // this should fail, too
393 LockCallback callback3 = mock(LockCallback.class);
394 DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
395 assertTrue(lock3.isUnavailable());
396 verify(callback3, never()).lockAvailable(lock3);
397 verify(callback3).lockUnavailable(lock3);
399 // no change to first
400 assertTrue(lock.isWaiting());
402 // no callbacks to the first lock
403 verify(callback, never()).lockAvailable(lock);
404 verify(callback, never()).lockUnavailable(lock);
406 assertTrue(lock.isWaiting());
407 assertEquals(0, getRecordCount());
410 assertTrue(lock.isActive());
411 assertEquals(1, getRecordCount());
413 verify(callback).lockAvailable(lock);
414 verify(callback, never()).lockUnavailable(lock);
416 // this should succeed
417 DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
418 assertTrue(lock4.isWaiting());
420 // after running checker, original records should still remain
421 runChecker(0, 0, EXPIRE_SEC);
422 assertEquals(1, getRecordCount());
423 verify(callback, never()).lockUnavailable(lock);
427 * Tests lock() when the feature is not the latest instance.
430 public void testCreateLockNotLatestInstance() {
431 DistributedLockManager.setLatestInstance(null);
433 Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
434 assertTrue(lock.isUnavailable());
435 verify(callback, never()).lockAvailable(any());
436 verify(callback).lockUnavailable(lock);
440 public void testCheckExpired() throws SQLException {
441 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
444 LockCallback callback2 = mock(LockCallback.class);
445 final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
448 LockCallback callback3 = mock(LockCallback.class);
449 final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
452 LockCallback callback4 = mock(LockCallback.class);
453 final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
456 LockCallback callback5 = mock(LockCallback.class);
457 final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
460 assertEquals(5, getRecordCount());
463 updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
465 // change host of another record
466 updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
468 // change uuid of another record
469 updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC);
473 runChecker(0, 0, EXPIRE_SEC);
477 assertTrue(lock.isUnavailable());
478 assertTrue(lock2.isActive());
479 assertTrue(lock3.isUnavailable());
480 assertTrue(lock4.isActive());
481 assertTrue(lock5.isUnavailable());
487 verify(callback).lockUnavailable(lock);
488 verify(callback3).lockUnavailable(lock3);
489 verify(callback5).lockUnavailable(lock5);
491 verify(callback2, never()).lockUnavailable(lock2);
492 verify(callback4, never()).lockUnavailable(lock4);
495 // another check should have been scheduled, with the normal interval
496 runChecker(1, 0, EXPIRE_SEC);
500 * Tests checkExpired(), when schedule() throws an exception.
503 public void testCheckExpiredExecRejected() {
504 // arrange for execution to be rejected
505 when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
506 .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
508 runChecker(0, 0, EXPIRE_SEC);
512 * Tests checkExpired(), when getConnection() throws an exception.
515 public void testCheckExpiredSqlEx() {
516 // use a data source that throws an exception when getConnection() is called
517 feature = new InvalidDbLockingFeature(TRANSIENT);
519 runChecker(0, 0, EXPIRE_SEC);
521 // it should have scheduled another check, sooner
522 runChecker(0, 0, RETRY_SEC);
526 public void testExpireLocks() throws SQLException {
527 AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
529 feature = new MyLockingFeature(true) {
531 protected BasicDataSource makeDataSource() throws Exception {
532 // get the real data source
533 BasicDataSource src2 = super.makeDataSource();
535 when(datasrc.getConnection()).thenAnswer(answer -> {
536 DistributedLock lck = freeLock.getAndSet(null);
545 return src2.getConnection();
552 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
555 LockCallback callback2 = mock(LockCallback.class);
556 final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
559 LockCallback callback3 = mock(LockCallback.class);
560 final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
561 // don't run doLock for lock3 - leave it in the waiting state
563 LockCallback callback4 = mock(LockCallback.class);
564 final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
567 assertEquals(3, getRecordCount());
570 updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
572 // arrange to free lock4 while the checker is running
576 runChecker(0, 0, EXPIRE_SEC);
580 assertTrue(lock.isUnavailable());
581 assertTrue(lock2.isActive());
582 assertTrue(lock3.isWaiting());
583 assertTrue(lock4.isUnavailable());
586 verify(exsvc, times(PRE_LOCK_EXECS + 6)).execute(any());
588 verify(callback).lockUnavailable(lock);
589 verify(callback2, never()).lockUnavailable(lock2);
590 verify(callback3, never()).lockUnavailable(lock3);
591 verify(callback4, never()).lockUnavailable(lock4);
595 public void testDistributedLockNoArgs() {
596 DistributedLock lock = new DistributedLock();
597 assertNull(lock.getResourceId());
598 assertNull(lock.getOwnerKey());
599 assertNull(lock.getCallback());
600 assertEquals(0, lock.getHoldSec());
604 public void testDistributedLock() {
605 assertThatIllegalArgumentException()
606 .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
607 .withMessageContaining("holdSec");
609 // should generate no exception
610 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
614 public void testDistributedLockSerializable() throws Exception {
615 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
616 lock = roundTrip(lock);
618 assertTrue(lock.isWaiting());
620 assertEquals(RESOURCE, lock.getResourceId());
621 assertEquals(OWNER_KEY, lock.getOwnerKey());
622 assertNull(lock.getCallback());
623 assertEquals(HOLD_SEC, lock.getHoldSec());
627 public void testGrant() {
628 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
629 assertFalse(lock.isActive());
631 // execute the doLock() call
634 assertTrue(lock.isActive());
636 // the callback for the lock should have been run in the foreground thread
637 verify(callback).lockAvailable(lock);
641 * Tests grant() when the lock is already unavailable.
644 public void testDistributedLockGrantUnavailable() {
645 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
646 lock.setState(LockState.UNAVAILABLE);
649 assertTrue(lock.isUnavailable());
650 verify(callback, never()).lockAvailable(any());
651 verify(callback, never()).lockUnavailable(any());
655 public void testDistributedLockDeny() {
657 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
659 // get another lock - should fail
660 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
662 assertTrue(lock.isUnavailable());
664 // the callback for the second lock should have been run in the foreground thread
665 verify(callback).lockUnavailable(lock);
667 // should only have a request for the first lock
668 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
672 public void testDistributedLockFree() {
673 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
675 assertTrue(lock.free());
676 assertTrue(lock.isUnavailable());
678 // run both requests associated with the lock
682 // should not have changed state
683 assertTrue(lock.isUnavailable());
685 // attempt to free it again
686 assertFalse(lock.free());
688 // should not have queued anything else
689 verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
691 // new lock should succeed
692 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
693 assertTrue(lock2 != lock);
694 assertTrue(lock2.isWaiting());
698 * Tests that free() works on a serialized lock with a new feature.
700 * @throws Exception if an error occurs
703 public void testDistributedLockFreeSerialized() throws Exception {
704 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
706 feature = new MyLockingFeature(true);
708 lock = roundTrip(lock);
709 assertTrue(lock.free());
710 assertTrue(lock.isUnavailable());
714 * Tests free() on a serialized lock without a feature.
716 * @throws Exception if an error occurs
719 public void testDistributedLockFreeNoFeature() throws Exception {
720 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
722 DistributedLockManager.setLatestInstance(null);
724 lock = roundTrip(lock);
725 assertFalse(lock.free());
726 assertTrue(lock.isUnavailable());
730 * Tests the case where the lock is freed and doUnlock called between the call to
731 * isUnavailable() and the call to compute().
734 public void testDistributedLockFreeUnlocked() {
735 feature = new FreeWithFreeLockingFeature(true);
737 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
739 assertFalse(lock.free());
740 assertTrue(lock.isUnavailable());
744 * Tests the case where the lock is freed, but doUnlock is not completed, between the
745 * call to isUnavailable() and the call to compute().
748 public void testDistributedLockFreeLockFreed() {
749 feature = new FreeWithFreeLockingFeature(false);
751 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
753 assertFalse(lock.free());
754 assertTrue(lock.isUnavailable());
758 public void testDistributedLockExtend() {
759 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
761 // lock2 should be denied - called back by this thread
762 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
763 verify(callback, never()).lockAvailable(lock2);
764 verify(callback).lockUnavailable(lock2);
766 // lock2 will still be denied - called back by this thread
767 lock2.extend(HOLD_SEC, callback);
768 verify(callback, times(2)).lockUnavailable(lock2);
770 // force lock2 to be active - should still be denied
771 Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
772 lock2.extend(HOLD_SEC, callback);
773 verify(callback, times(3)).lockUnavailable(lock2);
775 assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
776 .withMessageContaining("holdSec");
780 assertTrue(lock.isActive());
782 // now extend the first lock
783 LockCallback callback2 = mock(LockCallback.class);
784 lock.extend(HOLD_SEC2, callback2);
785 assertTrue(lock.isWaiting());
787 // execute doExtend()
789 lock.extend(HOLD_SEC2, callback2);
790 assertEquals(HOLD_SEC2, lock.getHoldSec());
791 verify(callback2).lockAvailable(lock);
792 verify(callback2, never()).lockUnavailable(lock);
796 * Tests that extend() works on a serialized lock with a new feature.
798 * @throws Exception if an error occurs
801 public void testDistributedLockExtendSerialized() throws Exception {
802 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
806 assertTrue(lock.isActive());
808 feature = new MyLockingFeature(true);
810 lock = roundTrip(lock);
811 assertTrue(lock.isActive());
813 LockCallback scallback = mock(LockCallback.class);
815 lock.extend(HOLD_SEC, scallback);
816 assertTrue(lock.isWaiting());
818 // run doExtend (in new feature)
820 assertTrue(lock.isActive());
822 verify(scallback).lockAvailable(lock);
823 verify(scallback, never()).lockUnavailable(lock);
827 * Tests extend() on a serialized lock without a feature.
829 * @throws Exception if an error occurs
832 public void testDistributedLockExtendNoFeature() throws Exception {
833 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
837 assertTrue(lock.isActive());
839 DistributedLockManager.setLatestInstance(null);
841 lock = roundTrip(lock);
842 assertTrue(lock.isActive());
844 LockCallback scallback = mock(LockCallback.class);
846 lock.extend(HOLD_SEC, scallback);
847 assertTrue(lock.isUnavailable());
849 verify(scallback, never()).lockAvailable(lock);
850 verify(scallback).lockUnavailable(lock);
854 * Tests the case where the lock is freed and doUnlock called between the call to
855 * isUnavailable() and the call to compute().
858 public void testDistributedLockExtendUnlocked() {
859 feature = new FreeWithFreeLockingFeature(true);
861 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
863 lock.extend(HOLD_SEC2, callback);
864 assertTrue(lock.isUnavailable());
865 verify(callback).lockUnavailable(lock);
869 * Tests the case where the lock is freed, but doUnlock is not completed, between the
870 * call to isUnavailable() and the call to compute().
873 public void testDistributedLockExtendLockFreed() {
874 feature = new FreeWithFreeLockingFeature(false);
876 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
878 lock.extend(HOLD_SEC2, callback);
879 assertTrue(lock.isUnavailable());
880 verify(callback).lockUnavailable(lock);
884 public void testDistributedLockScheduleRequest() {
885 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
888 verify(callback).lockAvailable(lock);
892 public void testDistributedLockRescheduleRequest() throws SQLException {
893 // use a data source that throws an exception when getConnection() is called
894 InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
897 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
899 // invoke doLock - should fail and reschedule
902 // should still be waiting
903 assertTrue(lock.isWaiting());
904 verify(callback, never()).lockUnavailable(lock);
906 // free the lock while doLock is executing
907 invfeat.freeLock = true;
909 // try scheduled request - should just invoke doUnlock
912 // should still be waiting
913 assertTrue(lock.isUnavailable());
914 verify(callback, never()).lockUnavailable(lock);
916 // should have scheduled a retry of doUnlock
917 verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
921 public void testDistributedLockGetNextRequest() {
922 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
925 * run doLock. This should cause getNextRequest() to be called twice, once with a
926 * request in the queue, and the second time with request=null.
932 * Tests getNextRequest(), where the same request is still in the queue the second
936 public void testDistributedLockGetNextRequestSameRequest() {
937 // force reschedule to be invoked
938 feature = new InvalidDbLockingFeature(TRANSIENT);
940 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
943 * run doLock. This should cause getNextRequest() to be called twice, once with a
944 * request in the queue, and the second time with the same request again.
948 verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
952 public void testDistributedLockDoRequest() throws SQLException {
953 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
955 assertTrue(lock.isWaiting());
957 // run doLock via doRequest
960 assertTrue(lock.isActive());
964 * Tests doRequest(), when doRequest() is already running within another thread.
967 public void testDistributedLockDoRequestBusy() {
969 * this feature will invoke a request in a background thread while it's being run
970 * in a foreground thread.
972 AtomicBoolean running = new AtomicBoolean(false);
973 AtomicBoolean returned = new AtomicBoolean(false);
975 feature = new MyLockingFeature(true) {
977 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
978 LockCallback callback) {
979 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
980 private static final long serialVersionUID = 1L;
983 protected boolean doDbInsert(Connection conn) throws SQLException {
985 // already inside the thread - don't recurse any further
986 return super.doDbInsert(conn);
991 Thread thread = new Thread(() -> {
992 // run doLock from within the new thread
995 thread.setDaemon(true);
998 // wait for the background thread to complete before continuing
1001 } catch (InterruptedException ignore) {
1002 Thread.currentThread().interrupt();
1005 returned.set(!thread.isAlive());
1007 return super.doDbInsert(conn);
1013 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1018 assertTrue(returned.get());
1022 * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1024 * @throws SQLException if an error occurs
1027 public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1028 // throw run-time exception
1029 when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1031 // use a data source that throws an exception when getConnection() is called
1032 feature = new MyLockingFeature(true) {
1034 protected BasicDataSource makeDataSource() throws Exception {
1039 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1041 // invoke doLock - should NOT reschedule
1044 assertTrue(lock.isUnavailable());
1045 verify(callback).lockUnavailable(lock);
1047 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1051 * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1054 * @throws SQLException if an error occurs
1057 public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1058 // throw run-time exception
1059 when(datasrc.getConnection()).thenAnswer(answer -> {
1061 throw new IllegalStateException(EXPECTED_EXCEPTION);
1064 // use a data source that throws an exception when getConnection() is called
1065 feature = new MyLockingFeature(true) {
1067 protected BasicDataSource makeDataSource() throws Exception {
1072 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1074 // invoke doLock - should NOT reschedule
1077 assertTrue(lock.isUnavailable());
1078 verify(callback, never()).lockUnavailable(lock);
1080 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1084 * Tests doRequest() when the retry count gets exhausted.
1087 public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1088 // use a data source that throws an exception when getConnection() is called
1089 feature = new InvalidDbLockingFeature(TRANSIENT);
1091 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1093 // invoke doLock - should fail and reschedule
1096 // should still be waiting
1097 assertTrue(lock.isWaiting());
1098 verify(callback, never()).lockUnavailable(lock);
1100 // try again, via SCHEDULER - first retry fails
1103 // should still be waiting
1104 assertTrue(lock.isWaiting());
1105 verify(callback, never()).lockUnavailable(lock);
1107 // try again, via SCHEDULER - final retry fails
1109 assertTrue(lock.isUnavailable());
1111 // now callback should have been called
1112 verify(callback).lockUnavailable(lock);
1116 * Tests doRequest() when a non-transient DB exception is thrown.
1119 public void testDistributedLockDoRequestNotTransient() {
1121 * use a data source that throws a PERMANENT exception when getConnection() is
1124 feature = new InvalidDbLockingFeature(PERMANENT);
1126 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1128 // invoke doLock - should fail
1131 assertTrue(lock.isUnavailable());
1132 verify(callback).lockUnavailable(lock);
1134 // should not have scheduled anything new
1135 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1136 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1140 public void testDistributedLockDoLock() throws SQLException {
1141 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1143 // invoke doLock - should simply do an insert
1144 long tbegin = System.currentTimeMillis();
1147 assertEquals(1, getRecordCount());
1148 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1149 verify(callback).lockAvailable(lock);
1153 * Tests doLock() when the lock is freed before doLock runs.
1155 * @throws SQLException if an error occurs
1158 public void testDistributedLockDoLockFreed() throws SQLException {
1159 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1161 lock.setState(LockState.UNAVAILABLE);
1163 // invoke doLock - should do nothing
1166 assertEquals(0, getRecordCount());
1168 verify(callback, never()).lockAvailable(lock);
1172 * Tests doLock() when a DB exception is thrown.
1175 public void testDistributedLockDoLockEx() {
1176 // use a data source that throws an exception when getConnection() is called
1177 feature = new InvalidDbLockingFeature(PERMANENT);
1179 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1181 // invoke doLock - should simply do an insert
1184 // lock should have failed due to exception
1185 verify(callback).lockUnavailable(lock);
1189 * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1193 public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1194 // insert an expired record
1195 insertRecord(RESOURCE, feature.getUuidString(), 0);
1197 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1199 // invoke doLock - should simply do an update
1201 verify(callback).lockAvailable(lock);
1205 * Tests doLock() when a locked record already exists.
1208 public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1209 // insert an expired record
1210 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1212 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1217 // lock should have failed because it's already locked
1218 verify(callback).lockUnavailable(lock);
1222 public void testDistributedLockDoUnlock() throws SQLException {
1223 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1230 // invoke doUnlock()
1231 long tbegin = System.currentTimeMillis();
1234 assertEquals(0, getRecordCount());
1235 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1237 assertTrue(lock.isUnavailable());
1239 // no more callbacks should have occurred
1240 verify(callback, times(1)).lockAvailable(lock);
1241 verify(callback, never()).lockUnavailable(lock);
1245 * Tests doUnlock() when a DB exception is thrown.
1247 * @throws SQLException if an error occurs
1250 public void testDistributedLockDoUnlockEx() throws SQLException {
1251 feature = new InvalidDbLockingFeature(PERMANENT);
1253 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1255 // do NOT invoke doLock() - it will fail without a DB connection
1259 // invoke doUnlock()
1262 assertTrue(lock.isUnavailable());
1264 // no more callbacks should have occurred
1265 verify(callback, never()).lockAvailable(lock);
1266 verify(callback, never()).lockUnavailable(lock);
1270 public void testDistributedLockDoExtend() throws SQLException {
1271 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1274 LockCallback callback2 = mock(LockCallback.class);
1275 lock.extend(HOLD_SEC2, callback2);
1278 long tbegin = System.currentTimeMillis();
1281 assertEquals(1, getRecordCount());
1282 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1284 assertTrue(lock.isActive());
1286 // no more callbacks should have occurred
1287 verify(callback).lockAvailable(lock);
1288 verify(callback, never()).lockUnavailable(lock);
1290 // extension should have succeeded
1291 verify(callback2).lockAvailable(lock);
1292 verify(callback2, never()).lockUnavailable(lock);
1296 * Tests doExtend() when the lock is freed before doExtend runs.
1298 * @throws SQLException if an error occurs
1301 public void testDistributedLockDoExtendFreed() throws SQLException {
1302 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1303 lock.extend(HOLD_SEC2, callback);
1305 lock.setState(LockState.UNAVAILABLE);
1307 // invoke doExtend - should do nothing
1310 assertEquals(0, getRecordCount());
1312 verify(callback, never()).lockAvailable(lock);
1316 * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1319 * @throws SQLException if an error occurs
1322 public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1323 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1326 LockCallback callback2 = mock(LockCallback.class);
1327 lock.extend(HOLD_SEC2, callback2);
1329 // delete the record so it's forced to re-insert it
1333 long tbegin = System.currentTimeMillis();
1336 assertEquals(1, getRecordCount());
1337 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1339 assertTrue(lock.isActive());
1341 // no more callbacks should have occurred
1342 verify(callback).lockAvailable(lock);
1343 verify(callback, never()).lockUnavailable(lock);
1345 // extension should have succeeded
1346 verify(callback2).lockAvailable(lock);
1347 verify(callback2, never()).lockUnavailable(lock);
1351 * Tests doExtend() when both update and insert fail.
1353 * @throws SQLException if an error occurs
1356 public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
1358 * this feature will create a lock that returns false when doDbUpdate() is
1359 * invoked, or when doDbInsert() is invoked a second time
1361 feature = new MyLockingFeature(true) {
1363 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1364 LockCallback callback) {
1365 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1366 private static final long serialVersionUID = 1L;
1367 private int ntimes = 0;
1370 protected boolean doDbInsert(Connection conn) throws SQLException {
1375 return super.doDbInsert(conn);
1379 protected boolean doDbUpdate(Connection conn) {
1386 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1389 LockCallback callback2 = mock(LockCallback.class);
1390 lock.extend(HOLD_SEC2, callback2);
1395 assertTrue(lock.isUnavailable());
1397 // no more callbacks should have occurred
1398 verify(callback).lockAvailable(lock);
1399 verify(callback, never()).lockUnavailable(lock);
1401 // extension should have failed
1402 verify(callback2, never()).lockAvailable(lock);
1403 verify(callback2).lockUnavailable(lock);
1407 * Tests doExtend() when an exception occurs.
1409 * @throws SQLException if an error occurs
1412 public void testDistributedLockDoExtendEx() throws SQLException {
1413 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1417 * delete the record and insert one with a different owner, which will cause
1418 * doDbInsert() to throw an exception
1421 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1423 LockCallback callback2 = mock(LockCallback.class);
1424 lock.extend(HOLD_SEC2, callback2);
1429 assertTrue(lock.isUnavailable());
1431 // no more callbacks should have occurred
1432 verify(callback).lockAvailable(lock);
1433 verify(callback, never()).lockUnavailable(lock);
1435 // extension should have failed
1436 verify(callback2, never()).lockAvailable(lock);
1437 verify(callback2).lockUnavailable(lock);
1441 public void testDistributedLockToString() {
1442 String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
1443 assertNotNull(text);
1444 assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1448 public void testMakeThreadPool() {
1449 // use a REAL feature to test this
1450 feature = new DistributedLockManager();
1452 // this should create a thread pool
1453 feature.beforeCreateLockManager(engine, new Properties());
1454 feature.afterStart(engine);
1460 * Performs a multi-threaded test of the locking facility.
1462 * @throws InterruptedException if the current thread is interrupted while waiting for
1463 * the background threads to complete
1466 public void testMultiThreaded() throws InterruptedException {
1467 feature = new DistributedLockManager();
1468 feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1469 feature.afterStart(PolicyEngineConstants.getManager());
1471 List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1472 for (int x = 0; x < MAX_THREADS; ++x) {
1473 threads.add(new MyThread());
1476 threads.forEach(Thread::start);
1478 for (MyThread thread : threads) {
1480 assertFalse(thread.isAlive());
1483 for (MyThread thread : threads) {
1484 if (thread.err != null) {
1489 assertTrue(nsuccesses.get() > 0);
1492 private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
1493 boolean waitForLock) {
1494 return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
1497 private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1498 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1499 try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1500 oos.writeObject(lock);
1503 ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1504 try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1505 return (DistributedLock) ois.readObject();
1510 * Runs the checkExpired() action.
1512 * @param nskip number of actions in the work queue to skip
1513 * @param nadditional number of additional actions that appear in the work queue
1514 * <i>after</i> the checkExpired action
1515 * @param schedSec number of seconds for which the checker should have been scheduled
1517 private void runChecker(int nskip, int nadditional, long schedSec) {
1518 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1519 verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1520 Runnable action = captor.getAllValues().get(nskip);
1525 * Runs a lock action (e.g., doLock, doUnlock).
1527 * @param nskip number of actions in the work queue to skip
1528 * @param nadditional number of additional actions that appear in the work queue
1529 * <i>after</i> the desired action
1531 void runLock(int nskip, int nadditional) {
1532 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1533 verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1535 Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1540 * Runs a scheduled action (e.g., "retry" action).
1542 * @param nskip number of actions in the work queue to skip
1543 * @param nadditional number of additional actions that appear in the work queue
1544 * <i>after</i> the desired action
1546 void runSchedule(int nskip, int nadditional) {
1547 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1548 verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
1550 Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1555 * Gets a count of the number of lock records in the DB.
1557 * @return the number of lock records in the DB
1558 * @throws SQLException if an error occurs accessing the DB
1560 private int getRecordCount() throws SQLException {
1561 try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1562 ResultSet result = stmt.executeQuery()) {
1564 if (result.next()) {
1565 return result.getInt(1);
1574 * Determines if there is a record for the given resource whose expiration time is in
1575 * the expected range.
1577 * @param resourceId ID of the resource of interest
1578 * @param uuidString UUID string of the owner
1579 * @param holdSec seconds for which the lock was to be held
1580 * @param tbegin earliest time, in milliseconds, at which the record could have been
1581 * inserted into the DB
1582 * @return {@code true} if a record is found, {@code false} otherwise
1583 * @throws SQLException if an error occurs accessing the DB
1585 private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1586 try (PreparedStatement stmt =
1587 conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1588 + " WHERE resourceId=? AND host=? AND owner=?")) {
1590 stmt.setString(1, resourceId);
1591 stmt.setString(2, feature.getHostName());
1592 stmt.setString(3, uuidString);
1594 try (ResultSet result = stmt.executeQuery()) {
1595 if (result.next()) {
1596 int remaining = result.getInt(1);
1597 long maxDiff = System.currentTimeMillis() - tbegin;
1598 return (remaining >= 0 && holdSec - remaining <= maxDiff);
1608 * Inserts a record into the DB.
1610 * @param resourceId ID of the resource of interest
1611 * @param uuidString UUID string of the owner
1612 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1613 * @throws SQLException if an error occurs accessing the DB
1615 private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1616 this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset);
1619 private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1620 throws SQLException {
1621 try (PreparedStatement stmt =
1622 conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1623 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1625 stmt.setString(1, resourceId);
1626 stmt.setString(2, hostName);
1627 stmt.setString(3, uuidString);
1628 stmt.setInt(4, expireOffset);
1630 assertEquals(1, stmt.executeUpdate());
1635 * Updates a record in the DB.
1637 * @param resourceId ID of the resource of interest
1638 * @param newUuid UUID string of the <i>new</i> owner
1639 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1640 * @throws SQLException if an error occurs accessing the DB
1642 private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1643 try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1644 + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1646 stmt.setString(1, newHost);
1647 stmt.setString(2, newUuid);
1648 stmt.setInt(3, expireOffset);
1649 stmt.setString(4, resourceId);
1651 assertEquals(1, stmt.executeUpdate());
1656 * Feature that uses <i>exsvc</i> to execute requests.
1658 private class MyLockingFeature extends DistributedLockManager {
1660 public MyLockingFeature(boolean init) {
1663 exsvc = mock(ScheduledExecutorService.class);
1664 when(engine.getExecutorService()).thenReturn(exsvc);
1667 beforeCreateLockManager(engine, new Properties());
1674 * Feature whose data source all throws exceptions.
1676 private class InvalidDbLockingFeature extends MyLockingFeature {
1677 private boolean isTransient;
1678 private boolean freeLock = false;
1680 public InvalidDbLockingFeature(boolean isTransient) {
1681 // pass "false" because we have to set the error code BEFORE calling
1685 this.isTransient = isTransient;
1687 this.beforeCreateLockManager(engine, new Properties());
1688 this.afterStart(engine);
1692 protected BasicDataSource makeDataSource() throws Exception {
1693 when(datasrc.getConnection()).thenAnswer(answer -> {
1702 doThrow(makeEx()).when(datasrc).close();
1707 private SQLException makeEx() {
1709 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1712 return new SQLException(EXPECTED_EXCEPTION);
1718 * Feature whose locks free themselves while free() is already running.
1720 private class FreeWithFreeLockingFeature extends MyLockingFeature {
1721 private boolean relock;
1723 public FreeWithFreeLockingFeature(boolean relock) {
1725 this.relock = relock;
1729 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1730 LockCallback callback) {
1732 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1733 private static final long serialVersionUID = 1L;
1734 private boolean checked = false;
1737 public boolean isUnavailable() {
1739 return super.isUnavailable();
1744 // release and relock
1752 createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1762 * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
1763 * extend it, and then unlock it.
1765 private class MyThread extends Thread {
1766 AssertionError err = null;
1775 for (int x = 0; x < MAX_LOOPS; ++x) {
1779 } catch (AssertionError e) {
1784 private void makeAttempt() {
1786 Semaphore sem = new Semaphore(0);
1788 LockCallback cb = new LockCallback() {
1790 public void lockAvailable(Lock lock) {
1795 public void lockUnavailable(Lock lock) {
1800 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1802 // wait for callback, whether available or unavailable
1803 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1804 if (!lock.isActive()) {
1808 nsuccesses.incrementAndGet();
1810 assertEquals(1, nactive.incrementAndGet());
1812 lock.extend(HOLD_SEC2, cb);
1813 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1814 assertTrue(lock.isActive());
1816 // decrement BEFORE free()
1817 nactive.decrementAndGet();
1819 assertTrue(lock.free());
1820 assertTrue(lock.isUnavailable());
1822 } catch (InterruptedException e) {
1823 Thread.currentThread().interrupt();
1824 throw new AssertionError("interrupted", e);