2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.distributed.locking;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
32 import static org.mockito.Matchers.any;
33 import static org.mockito.Matchers.anyLong;
34 import static org.mockito.Matchers.eq;
35 import static org.mockito.Mockito.doThrow;
36 import static org.mockito.Mockito.mock;
37 import static org.mockito.Mockito.never;
38 import static org.mockito.Mockito.times;
39 import static org.mockito.Mockito.verify;
40 import static org.mockito.Mockito.when;
42 import java.io.ByteArrayInputStream;
43 import java.io.ByteArrayOutputStream;
44 import java.io.ObjectInputStream;
45 import java.io.ObjectOutputStream;
46 import java.sql.Connection;
47 import java.sql.DriverManager;
48 import java.sql.PreparedStatement;
49 import java.sql.ResultSet;
50 import java.sql.SQLException;
51 import java.util.ArrayList;
52 import java.util.List;
53 import java.util.Properties;
54 import java.util.concurrent.Executors;
55 import java.util.concurrent.RejectedExecutionException;
56 import java.util.concurrent.ScheduledExecutorService;
57 import java.util.concurrent.Semaphore;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.atomic.AtomicBoolean;
60 import java.util.concurrent.atomic.AtomicInteger;
61 import java.util.concurrent.atomic.AtomicReference;
62 import org.apache.commons.dbcp2.BasicDataSource;
63 import org.junit.After;
64 import org.junit.AfterClass;
65 import org.junit.Before;
66 import org.junit.BeforeClass;
67 import org.junit.Test;
68 import org.mockito.ArgumentCaptor;
69 import org.mockito.Mock;
70 import org.mockito.MockitoAnnotations;
71 import org.onap.policy.common.utils.services.OrderedServiceImpl;
72 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
73 import org.onap.policy.drools.core.lock.Lock;
74 import org.onap.policy.drools.core.lock.LockCallback;
75 import org.onap.policy.drools.core.lock.LockState;
76 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
77 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
78 import org.onap.policy.drools.system.PolicyEngine;
79 import org.onap.policy.drools.system.PolicyEngineConstants;
80 import org.powermock.reflect.Whitebox;
82 public class DistributedLockManagerTest {
83 private static final long EXPIRE_SEC = 900L;
84 private static final long RETRY_SEC = 60L;
85 private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
86 private static final String OTHER_HOST = "other-host";
87 private static final String OTHER_OWNER = "other-owner";
88 private static final String EXPECTED_EXCEPTION = "expected exception";
89 private static final String DB_CONNECTION =
90 "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
91 private static final String DB_USER = "user";
92 private static final String DB_PASSWORD = "password";
93 private static final String OWNER_KEY = "my key";
94 private static final String RESOURCE = "my resource";
95 private static final String RESOURCE2 = "my resource #2";
96 private static final String RESOURCE3 = "my resource #3";
97 private static final String RESOURCE4 = "my resource #4";
98 private static final String RESOURCE5 = "my resource #5";
99 private static final int HOLD_SEC = 100;
100 private static final int HOLD_SEC2 = 120;
101 private static final int MAX_THREADS = 5;
102 private static final int MAX_LOOPS = 100;
103 private static final int TRANSIENT = 500;
104 private static final int PERMANENT = 600;
106 // number of execute() calls before the first lock attempt
107 private static final int PRE_LOCK_EXECS = 1;
109 // number of execute() calls before the first schedule attempt
110 private static final int PRE_SCHED_EXECS = 1;
112 private static Connection conn = null;
113 private static ScheduledExecutorService saveExec;
114 private static ScheduledExecutorService realExec;
117 private ScheduledExecutorService exsvc;
120 private LockCallback callback;
123 private BasicDataSource datasrc;
126 private PolicyEngine engine;
128 private DistributedLock lock;
130 private AtomicInteger nactive;
131 private AtomicInteger nsuccesses;
132 private DistributedLockManager feature;
136 * Configures the location of the property files and creates the DB.
138 * @throws SQLException if the DB cannot be created
141 public static void setUpBeforeClass() throws SQLException {
142 SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
144 conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
146 try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
147 + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
148 + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
149 createStmt.executeUpdate();
152 saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
154 realExec = Executors.newScheduledThreadPool(3);
155 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
159 * Restores static fields.
162 public static void tearDownAfterClass() throws SQLException {
163 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
169 * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
172 * @throws SQLException if the lock records cannot be deleted from the DB
175 public void setUp() throws SQLException {
176 MockitoAnnotations.initMocks(this);
178 nactive = new AtomicInteger(0);
179 nsuccesses = new AtomicInteger(0);
183 when(engine.getExecutorService()).thenReturn(exsvc);
185 feature = new MyLockingFeature(true);
189 public void tearDown() throws SQLException {
194 private void cleanDb() throws SQLException {
195 try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
196 stmt.executeUpdate();
200 private void shutdownFeature() {
201 if (feature != null) {
202 feature.afterStop(engine);
208 * Tests that the feature is found in the expected service sets.
211 public void testServiceApis() {
212 assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
213 .anyMatch(obj -> obj instanceof DistributedLockManager));
217 * Tests constructor() when properties are invalid.
220 public void testDistributedLockManagerInvalidProperties() {
221 // use properties containing an invalid value
222 Properties props = new Properties();
223 props.setProperty(DistributedLockProperties.EXPIRE_CHECK_SEC, "abc");
225 feature = new MyLockingFeature(false) {
227 protected Properties getProperties(String fileName) {
232 assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
236 public void testGetSequenceNumber() {
237 assertEquals(1000, feature.getSequenceNumber());
241 public void testStartableApi() {
242 assertTrue(feature.isAlive());
243 assertTrue(feature.start());
244 assertTrue(feature.stop());
247 // above should have had no effect
248 assertTrue(feature.isAlive());
250 feature.afterStop(engine);
251 assertFalse(feature.isAlive());
255 public void testLockApi() {
256 assertFalse(feature.isLocked());
257 assertTrue(feature.lock());
258 assertTrue(feature.unlock());
262 public void testBeforeCreateLockManager() {
263 assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
267 * Tests beforeCreate(), when getProperties() throws a runtime exception.
270 public void testBeforeCreateLockManagerEx() {
273 feature = new MyLockingFeature(false) {
275 protected Properties getProperties(String fileName) {
276 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
280 assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties()))
281 .isInstanceOf(DistributedLockManagerException.class);
285 public void testAfterStart() {
286 // verify that cleanup & expire check are both added to the queue
287 verify(exsvc).execute(any());
288 verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
292 * Tests afterStart(), when thread pool throws a runtime exception.
295 public void testAfterStartExInThreadPool() {
298 feature = new MyLockingFeature(false);
300 when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
301 .thenThrow(new IllegalArgumentException(EXPECTED_EXCEPTION));
303 assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
307 public void testDeleteExpiredDbLocks() throws SQLException {
308 // add records: two expired, one not
309 insertRecord(RESOURCE, feature.getUuidString(), -1);
310 insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
311 insertRecord(RESOURCE3, OTHER_OWNER, 0);
312 insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
314 // get the clean-up function and execute it
315 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
316 verify(exsvc).execute(captor.capture());
318 long tbegin = System.currentTimeMillis();
319 Runnable action = captor.getValue();
322 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
323 assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
324 assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
325 assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
327 assertEquals(2, getRecordCount());
331 * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
333 * @throws SQLException if an error occurs
336 public void testDeleteExpiredDbLocksEx() throws SQLException {
337 feature = new InvalidDbLockingFeature(TRANSIENT);
339 // get the clean-up function and execute it
340 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
341 verify(exsvc).execute(captor.capture());
343 Runnable action = captor.getValue();
345 // should not throw an exception
350 public void testAfterStop() {
353 feature = new DistributedLockManager();
355 // shutdown without calling afterStart()
361 * Tests afterStop(), when the data source throws an exception when close() is called.
363 * @throws SQLException if an error occurs
366 public void testAfterStopEx() throws SQLException {
369 // use a data source that throws an exception when closed
370 feature = new InvalidDbLockingFeature(TRANSIENT);
376 public void testCreateLock() throws SQLException {
377 verify(exsvc).execute(any());
379 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
380 assertTrue(lock.isWaiting());
382 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
384 // this lock should fail
385 LockCallback callback2 = mock(LockCallback.class);
386 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
387 assertTrue(lock2.isUnavailable());
388 verify(callback2, never()).lockAvailable(lock2);
389 verify(callback2).lockUnavailable(lock2);
391 // this should fail, too
392 LockCallback callback3 = mock(LockCallback.class);
393 DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
394 assertTrue(lock3.isUnavailable());
395 verify(callback3, never()).lockAvailable(lock3);
396 verify(callback3).lockUnavailable(lock3);
398 // no change to first
399 assertTrue(lock.isWaiting());
401 // no callbacks to the first lock
402 verify(callback, never()).lockAvailable(lock);
403 verify(callback, never()).lockUnavailable(lock);
405 assertTrue(lock.isWaiting());
406 assertEquals(0, getRecordCount());
409 assertTrue(lock.isActive());
410 assertEquals(1, getRecordCount());
412 verify(callback).lockAvailable(lock);
413 verify(callback, never()).lockUnavailable(lock);
415 // this should succeed
416 DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
417 assertTrue(lock4.isWaiting());
419 // after running checker, original records should still remain
420 runChecker(0, 0, EXPIRE_SEC);
421 assertEquals(1, getRecordCount());
422 verify(callback, never()).lockUnavailable(lock);
426 * Tests lock() when the feature is not the latest instance.
429 public void testCreateLockNotLatestInstance() {
430 DistributedLockManager.setLatestInstance(null);
432 Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
433 assertTrue(lock.isUnavailable());
434 verify(callback, never()).lockAvailable(any());
435 verify(callback).lockUnavailable(lock);
439 public void testCheckExpired() throws SQLException {
440 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
443 LockCallback callback2 = mock(LockCallback.class);
444 final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
447 LockCallback callback3 = mock(LockCallback.class);
448 final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
451 LockCallback callback4 = mock(LockCallback.class);
452 final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
455 LockCallback callback5 = mock(LockCallback.class);
456 final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
459 assertEquals(5, getRecordCount());
462 updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
464 // change host of another record
465 updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
467 // change uuid of another record
468 updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC);
472 runChecker(0, 0, EXPIRE_SEC);
476 assertTrue(lock.isUnavailable());
477 assertTrue(lock2.isActive());
478 assertTrue(lock3.isUnavailable());
479 assertTrue(lock4.isActive());
480 assertTrue(lock5.isUnavailable());
486 verify(callback).lockUnavailable(lock);
487 verify(callback3).lockUnavailable(lock3);
488 verify(callback5).lockUnavailable(lock5);
490 verify(callback2, never()).lockUnavailable(lock2);
491 verify(callback4, never()).lockUnavailable(lock4);
494 // another check should have been scheduled, with the normal interval
495 runChecker(1, 0, EXPIRE_SEC);
499 * Tests checkExpired(), when schedule() throws an exception.
502 public void testCheckExpiredExecRejected() {
503 // arrange for execution to be rejected
504 when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
505 .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
507 runChecker(0, 0, EXPIRE_SEC);
511 * Tests checkExpired(), when getConnection() throws an exception.
514 public void testCheckExpiredSqlEx() {
515 // use a data source that throws an exception when getConnection() is called
516 feature = new InvalidDbLockingFeature(TRANSIENT);
518 runChecker(0, 0, EXPIRE_SEC);
520 // it should have scheduled another check, sooner
521 runChecker(0, 0, RETRY_SEC);
525 public void testExpireLocks() throws SQLException {
526 AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
528 feature = new MyLockingFeature(true) {
530 protected BasicDataSource makeDataSource() throws Exception {
531 // get the real data source
532 BasicDataSource src2 = super.makeDataSource();
534 when(datasrc.getConnection()).thenAnswer(answer -> {
535 DistributedLock lck = freeLock.getAndSet(null);
544 return src2.getConnection();
551 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
554 LockCallback callback2 = mock(LockCallback.class);
555 final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
558 LockCallback callback3 = mock(LockCallback.class);
559 final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
560 // don't run doLock for lock3 - leave it in the waiting state
562 LockCallback callback4 = mock(LockCallback.class);
563 final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
566 assertEquals(3, getRecordCount());
569 updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
571 // arrange to free lock4 while the checker is running
575 runChecker(0, 0, EXPIRE_SEC);
579 assertTrue(lock.isUnavailable());
580 assertTrue(lock2.isActive());
581 assertTrue(lock3.isWaiting());
582 assertTrue(lock4.isUnavailable());
585 verify(exsvc, times(PRE_LOCK_EXECS + 6)).execute(any());
587 verify(callback).lockUnavailable(lock);
588 verify(callback2, never()).lockUnavailable(lock2);
589 verify(callback3, never()).lockUnavailable(lock3);
590 verify(callback4, never()).lockUnavailable(lock4);
594 public void testDistributedLockNoArgs() {
595 DistributedLock lock = new DistributedLock();
596 assertNull(lock.getResourceId());
597 assertNull(lock.getOwnerKey());
598 assertNull(lock.getCallback());
599 assertEquals(0, lock.getHoldSec());
603 public void testDistributedLock() {
604 assertThatIllegalArgumentException()
605 .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
606 .withMessageContaining("holdSec");
608 // should generate no exception
609 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
613 public void testDistributedLockSerializable() throws Exception {
614 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
615 lock = roundTrip(lock);
617 assertTrue(lock.isWaiting());
619 assertEquals(RESOURCE, lock.getResourceId());
620 assertEquals(OWNER_KEY, lock.getOwnerKey());
621 assertNull(lock.getCallback());
622 assertEquals(HOLD_SEC, lock.getHoldSec());
626 public void testGrant() {
627 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
628 assertFalse(lock.isActive());
630 // execute the doLock() call
633 assertTrue(lock.isActive());
635 // the callback for the lock should have been run in the foreground thread
636 verify(callback).lockAvailable(lock);
640 * Tests grant() when the lock is already unavailable.
643 public void testDistributedLockGrantUnavailable() {
644 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
645 lock.setState(LockState.UNAVAILABLE);
648 assertTrue(lock.isUnavailable());
649 verify(callback, never()).lockAvailable(any());
650 verify(callback, never()).lockUnavailable(any());
654 public void testDistributedLockDeny() {
656 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
658 // get another lock - should fail
659 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
661 assertTrue(lock.isUnavailable());
663 // the callback for the second lock should have been run in the foreground thread
664 verify(callback).lockUnavailable(lock);
666 // should only have a request for the first lock
667 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
671 public void testDistributedLockFree() {
672 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
674 assertTrue(lock.free());
675 assertTrue(lock.isUnavailable());
677 // run both requests associated with the lock
681 // should not have changed state
682 assertTrue(lock.isUnavailable());
684 // attempt to free it again
685 assertFalse(lock.free());
687 // should not have queued anything else
688 verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
690 // new lock should succeed
691 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
692 assertTrue(lock2 != lock);
693 assertTrue(lock2.isWaiting());
697 * Tests that free() works on a serialized lock with a new feature.
699 * @throws Exception if an error occurs
702 public void testDistributedLockFreeSerialized() throws Exception {
703 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
705 feature = new MyLockingFeature(true);
707 lock = roundTrip(lock);
708 assertTrue(lock.free());
709 assertTrue(lock.isUnavailable());
713 * Tests free() on a serialized lock without a feature.
715 * @throws Exception if an error occurs
718 public void testDistributedLockFreeNoFeature() throws Exception {
719 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
721 DistributedLockManager.setLatestInstance(null);
723 lock = roundTrip(lock);
724 assertFalse(lock.free());
725 assertTrue(lock.isUnavailable());
729 * Tests the case where the lock is freed and doUnlock called between the call to
730 * isUnavailable() and the call to compute().
733 public void testDistributedLockFreeUnlocked() {
734 feature = new FreeWithFreeLockingFeature(true);
736 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
738 assertFalse(lock.free());
739 assertTrue(lock.isUnavailable());
743 * Tests the case where the lock is freed, but doUnlock is not completed, between the
744 * call to isUnavailable() and the call to compute().
747 public void testDistributedLockFreeLockFreed() {
748 feature = new FreeWithFreeLockingFeature(false);
750 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
752 assertFalse(lock.free());
753 assertTrue(lock.isUnavailable());
757 public void testDistributedLockExtend() {
758 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
760 // lock2 should be denied - called back by this thread
761 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
762 verify(callback, never()).lockAvailable(lock2);
763 verify(callback).lockUnavailable(lock2);
765 // lock2 will still be denied - called back by this thread
766 lock2.extend(HOLD_SEC, callback);
767 verify(callback, times(2)).lockUnavailable(lock2);
769 // force lock2 to be active - should still be denied
770 Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
771 lock2.extend(HOLD_SEC, callback);
772 verify(callback, times(3)).lockUnavailable(lock2);
774 assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
775 .withMessageContaining("holdSec");
779 assertTrue(lock.isActive());
781 // now extend the first lock
782 LockCallback callback2 = mock(LockCallback.class);
783 lock.extend(HOLD_SEC2, callback2);
784 assertTrue(lock.isWaiting());
786 // execute doExtend()
788 lock.extend(HOLD_SEC2, callback2);
789 assertEquals(HOLD_SEC2, lock.getHoldSec());
790 verify(callback2).lockAvailable(lock);
791 verify(callback2, never()).lockUnavailable(lock);
795 * Tests that extend() works on a serialized lock with a new feature.
797 * @throws Exception if an error occurs
800 public void testDistributedLockExtendSerialized() throws Exception {
801 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
805 assertTrue(lock.isActive());
807 feature = new MyLockingFeature(true);
809 lock = roundTrip(lock);
810 assertTrue(lock.isActive());
812 LockCallback scallback = mock(LockCallback.class);
814 lock.extend(HOLD_SEC, scallback);
815 assertTrue(lock.isWaiting());
817 // run doExtend (in new feature)
819 assertTrue(lock.isActive());
821 verify(scallback).lockAvailable(lock);
822 verify(scallback, never()).lockUnavailable(lock);
826 * Tests extend() on a serialized lock without a feature.
828 * @throws Exception if an error occurs
831 public void testDistributedLockExtendNoFeature() throws Exception {
832 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
836 assertTrue(lock.isActive());
838 DistributedLockManager.setLatestInstance(null);
840 lock = roundTrip(lock);
841 assertTrue(lock.isActive());
843 LockCallback scallback = mock(LockCallback.class);
845 lock.extend(HOLD_SEC, scallback);
846 assertTrue(lock.isUnavailable());
848 verify(scallback, never()).lockAvailable(lock);
849 verify(scallback).lockUnavailable(lock);
853 * Tests the case where the lock is freed and doUnlock called between the call to
854 * isUnavailable() and the call to compute().
857 public void testDistributedLockExtendUnlocked() {
858 feature = new FreeWithFreeLockingFeature(true);
860 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
862 lock.extend(HOLD_SEC2, callback);
863 assertTrue(lock.isUnavailable());
864 verify(callback).lockUnavailable(lock);
868 * Tests the case where the lock is freed, but doUnlock is not completed, between the
869 * call to isUnavailable() and the call to compute().
872 public void testDistributedLockExtendLockFreed() {
873 feature = new FreeWithFreeLockingFeature(false);
875 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
877 lock.extend(HOLD_SEC2, callback);
878 assertTrue(lock.isUnavailable());
879 verify(callback).lockUnavailable(lock);
883 public void testDistributedLockScheduleRequest() {
884 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
887 verify(callback).lockAvailable(lock);
891 public void testDistributedLockRescheduleRequest() throws SQLException {
892 // use a data source that throws an exception when getConnection() is called
893 InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
896 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
898 // invoke doLock - should fail and reschedule
901 // should still be waiting
902 assertTrue(lock.isWaiting());
903 verify(callback, never()).lockUnavailable(lock);
905 // free the lock while doLock is executing
906 invfeat.freeLock = true;
908 // try scheduled request - should just invoke doUnlock
911 // should still be waiting
912 assertTrue(lock.isUnavailable());
913 verify(callback, never()).lockUnavailable(lock);
915 // should have scheduled a retry of doUnlock
916 verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
920 public void testDistributedLockGetNextRequest() {
921 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
924 * run doLock. This should cause getNextRequest() to be called twice, once with a
925 * request in the queue, and the second time with request=null.
931 * Tests getNextRequest(), where the same request is still in the queue the second
935 public void testDistributedLockGetNextRequestSameRequest() {
936 // force reschedule to be invoked
937 feature = new InvalidDbLockingFeature(TRANSIENT);
939 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
942 * run doLock. This should cause getNextRequest() to be called twice, once with a
943 * request in the queue, and the second time with the same request again.
947 verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
951 public void testDistributedLockDoRequest() throws SQLException {
952 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
954 assertTrue(lock.isWaiting());
956 // run doLock via doRequest
959 assertTrue(lock.isActive());
963 * Tests doRequest(), when doRequest() is already running within another thread.
966 public void testDistributedLockDoRequestBusy() {
968 * this feature will invoke a request in a background thread while it's being run
969 * in a foreground thread.
971 AtomicBoolean running = new AtomicBoolean(false);
972 AtomicBoolean returned = new AtomicBoolean(false);
974 feature = new MyLockingFeature(true) {
976 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
977 LockCallback callback) {
978 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
979 private static final long serialVersionUID = 1L;
982 protected boolean doDbInsert(Connection conn) throws SQLException {
984 // already inside the thread - don't recurse any further
985 return super.doDbInsert(conn);
990 Thread thread = new Thread(() -> {
991 // run doLock from within the new thread
994 thread.setDaemon(true);
997 // wait for the background thread to complete before continuing
1000 } catch (InterruptedException ignore) {
1001 Thread.currentThread().interrupt();
1004 returned.set(!thread.isAlive());
1006 return super.doDbInsert(conn);
1012 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1017 assertTrue(returned.get());
1021 * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1023 * @throws SQLException if an error occurs
1026 public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1027 // throw run-time exception
1028 when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1030 // use a data source that throws an exception when getConnection() is called
1031 feature = new MyLockingFeature(true) {
1033 protected BasicDataSource makeDataSource() throws Exception {
1038 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1040 // invoke doLock - should NOT reschedule
1043 assertTrue(lock.isUnavailable());
1044 verify(callback).lockUnavailable(lock);
1046 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1050 * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1053 * @throws SQLException if an error occurs
1056 public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1057 // throw run-time exception
1058 when(datasrc.getConnection()).thenAnswer(answer -> {
1060 throw new IllegalStateException(EXPECTED_EXCEPTION);
1063 // use a data source that throws an exception when getConnection() is called
1064 feature = new MyLockingFeature(true) {
1066 protected BasicDataSource makeDataSource() throws Exception {
1071 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1073 // invoke doLock - should NOT reschedule
1076 assertTrue(lock.isUnavailable());
1077 verify(callback, never()).lockUnavailable(lock);
1079 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1083 * Tests doRequest() when the retry count gets exhausted.
1086 public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1087 // use a data source that throws an exception when getConnection() is called
1088 feature = new InvalidDbLockingFeature(TRANSIENT);
1090 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1092 // invoke doLock - should fail and reschedule
1095 // should still be waiting
1096 assertTrue(lock.isWaiting());
1097 verify(callback, never()).lockUnavailable(lock);
1099 // try again, via SCHEDULER - first retry fails
1102 // should still be waiting
1103 assertTrue(lock.isWaiting());
1104 verify(callback, never()).lockUnavailable(lock);
1106 // try again, via SCHEDULER - final retry fails
1108 assertTrue(lock.isUnavailable());
1110 // now callback should have been called
1111 verify(callback).lockUnavailable(lock);
1115 * Tests doRequest() when a non-transient DB exception is thrown.
1118 public void testDistributedLockDoRequestNotTransient() {
1120 * use a data source that throws a PERMANENT exception when getConnection() is
1123 feature = new InvalidDbLockingFeature(PERMANENT);
1125 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1127 // invoke doLock - should fail
1130 assertTrue(lock.isUnavailable());
1131 verify(callback).lockUnavailable(lock);
1133 // should not have scheduled anything new
1134 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1135 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1139 public void testDistributedLockDoLock() throws SQLException {
1140 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1142 // invoke doLock - should simply do an insert
1143 long tbegin = System.currentTimeMillis();
1146 assertEquals(1, getRecordCount());
1147 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1148 verify(callback).lockAvailable(lock);
1152 * Tests doLock() when the lock is freed before doLock runs.
1154 * @throws SQLException if an error occurs
1157 public void testDistributedLockDoLockFreed() throws SQLException {
1158 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1160 lock.setState(LockState.UNAVAILABLE);
1162 // invoke doLock - should do nothing
1165 assertEquals(0, getRecordCount());
1167 verify(callback, never()).lockAvailable(lock);
1171 * Tests doLock() when a DB exception is thrown.
1174 public void testDistributedLockDoLockEx() {
1175 // use a data source that throws an exception when getConnection() is called
1176 feature = new InvalidDbLockingFeature(PERMANENT);
1178 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1180 // invoke doLock - should simply do an insert
1183 // lock should have failed due to exception
1184 verify(callback).lockUnavailable(lock);
1188 * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1192 public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1193 // insert an expired record
1194 insertRecord(RESOURCE, feature.getUuidString(), 0);
1196 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1198 // invoke doLock - should simply do an update
1200 verify(callback).lockAvailable(lock);
1204 * Tests doLock() when a locked record already exists.
1207 public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1208 // insert an expired record
1209 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1211 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1216 // lock should have failed because it's already locked
1217 verify(callback).lockUnavailable(lock);
1221 public void testDistributedLockDoUnlock() throws SQLException {
1222 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1229 // invoke doUnlock()
1230 long tbegin = System.currentTimeMillis();
1233 assertEquals(0, getRecordCount());
1234 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1236 assertTrue(lock.isUnavailable());
1238 // no more callbacks should have occurred
1239 verify(callback, times(1)).lockAvailable(lock);
1240 verify(callback, never()).lockUnavailable(lock);
1244 * Tests doUnlock() when a DB exception is thrown.
1246 * @throws SQLException if an error occurs
1249 public void testDistributedLockDoUnlockEx() throws SQLException {
1250 feature = new InvalidDbLockingFeature(PERMANENT);
1252 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1254 // do NOT invoke doLock() - it will fail without a DB connection
1258 // invoke doUnlock()
1261 assertTrue(lock.isUnavailable());
1263 // no more callbacks should have occurred
1264 verify(callback, never()).lockAvailable(lock);
1265 verify(callback, never()).lockUnavailable(lock);
1269 public void testDistributedLockDoExtend() throws SQLException {
1270 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1273 LockCallback callback2 = mock(LockCallback.class);
1274 lock.extend(HOLD_SEC2, callback2);
1277 long tbegin = System.currentTimeMillis();
1280 assertEquals(1, getRecordCount());
1281 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1283 assertTrue(lock.isActive());
1285 // no more callbacks should have occurred
1286 verify(callback).lockAvailable(lock);
1287 verify(callback, never()).lockUnavailable(lock);
1289 // extension should have succeeded
1290 verify(callback2).lockAvailable(lock);
1291 verify(callback2, never()).lockUnavailable(lock);
1295 * Tests doExtend() when the lock is freed before doExtend runs.
1297 * @throws SQLException if an error occurs
1300 public void testDistributedLockDoExtendFreed() throws SQLException {
1301 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1302 lock.extend(HOLD_SEC2, callback);
1304 lock.setState(LockState.UNAVAILABLE);
1306 // invoke doExtend - should do nothing
1309 assertEquals(0, getRecordCount());
1311 verify(callback, never()).lockAvailable(lock);
1315 * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1318 * @throws SQLException if an error occurs
1321 public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1322 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1325 LockCallback callback2 = mock(LockCallback.class);
1326 lock.extend(HOLD_SEC2, callback2);
1328 // delete the record so it's forced to re-insert it
1332 long tbegin = System.currentTimeMillis();
1335 assertEquals(1, getRecordCount());
1336 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1338 assertTrue(lock.isActive());
1340 // no more callbacks should have occurred
1341 verify(callback).lockAvailable(lock);
1342 verify(callback, never()).lockUnavailable(lock);
1344 // extension should have succeeded
1345 verify(callback2).lockAvailable(lock);
1346 verify(callback2, never()).lockUnavailable(lock);
1350 * Tests doExtend() when both update and insert fail.
1352 * @throws SQLException if an error occurs
1355 public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
1357 * this feature will create a lock that returns false when doDbUpdate() is
1358 * invoked, or when doDbInsert() is invoked a second time
1360 feature = new MyLockingFeature(true) {
1362 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1363 LockCallback callback) {
1364 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1365 private static final long serialVersionUID = 1L;
1366 private int ntimes = 0;
1369 protected boolean doDbInsert(Connection conn) throws SQLException {
1374 return super.doDbInsert(conn);
1378 protected boolean doDbUpdate(Connection conn) {
1385 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1388 LockCallback callback2 = mock(LockCallback.class);
1389 lock.extend(HOLD_SEC2, callback2);
1394 assertTrue(lock.isUnavailable());
1396 // no more callbacks should have occurred
1397 verify(callback).lockAvailable(lock);
1398 verify(callback, never()).lockUnavailable(lock);
1400 // extension should have failed
1401 verify(callback2, never()).lockAvailable(lock);
1402 verify(callback2).lockUnavailable(lock);
1406 * Tests doExtend() when an exception occurs.
1408 * @throws SQLException if an error occurs
1411 public void testDistributedLockDoExtendEx() throws SQLException {
1412 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1416 * delete the record and insert one with a different owner, which will cause
1417 * doDbInsert() to throw an exception
1420 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1422 LockCallback callback2 = mock(LockCallback.class);
1423 lock.extend(HOLD_SEC2, callback2);
1428 assertTrue(lock.isUnavailable());
1430 // no more callbacks should have occurred
1431 verify(callback).lockAvailable(lock);
1432 verify(callback, never()).lockUnavailable(lock);
1434 // extension should have failed
1435 verify(callback2, never()).lockAvailable(lock);
1436 verify(callback2).lockUnavailable(lock);
1440 public void testDistributedLockToString() {
1441 String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
1442 assertNotNull(text);
1443 assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1447 public void testMakeThreadPool() {
1448 // use a REAL feature to test this
1449 feature = new DistributedLockManager();
1451 // this should create a thread pool
1452 feature.beforeCreateLockManager(engine, new Properties());
1453 feature.afterStart(engine);
1459 * Performs a multi-threaded test of the locking facility.
1461 * @throws InterruptedException if the current thread is interrupted while waiting for
1462 * the background threads to complete
1465 public void testMultiThreaded() throws InterruptedException {
1466 feature = new DistributedLockManager();
1467 feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1468 feature.afterStart(PolicyEngineConstants.getManager());
1470 List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1471 for (int x = 0; x < MAX_THREADS; ++x) {
1472 threads.add(new MyThread());
1475 threads.forEach(Thread::start);
1477 for (MyThread thread : threads) {
1479 assertFalse(thread.isAlive());
1482 for (MyThread thread : threads) {
1483 if (thread.err != null) {
1488 assertTrue(nsuccesses.get() > 0);
1491 private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
1492 boolean waitForLock) {
1493 return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
1496 private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1497 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1498 try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1499 oos.writeObject(lock);
1502 ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1503 try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1504 return (DistributedLock) ois.readObject();
1509 * Runs the checkExpired() action.
1511 * @param nskip number of actions in the work queue to skip
1512 * @param nadditional number of additional actions that appear in the work queue
1513 * <i>after</i> the checkExpired action
1514 * @param schedSec number of seconds for which the checker should have been scheduled
1516 private void runChecker(int nskip, int nadditional, long schedSec) {
1517 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1518 verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1519 Runnable action = captor.getAllValues().get(nskip);
1524 * Runs a lock action (e.g., doLock, doUnlock).
1526 * @param nskip number of actions in the work queue to skip
1527 * @param nadditional number of additional actions that appear in the work queue
1528 * <i>after</i> the desired action
1530 void runLock(int nskip, int nadditional) {
1531 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1532 verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1534 Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1539 * Runs a scheduled action (e.g., "retry" action).
1541 * @param nskip number of actions in the work queue to skip
1542 * @param nadditional number of additional actions that appear in the work queue
1543 * <i>after</i> the desired action
1545 void runSchedule(int nskip, int nadditional) {
1546 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1547 verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
1549 Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1554 * Gets a count of the number of lock records in the DB.
1556 * @return the number of lock records in the DB
1557 * @throws SQLException if an error occurs accessing the DB
1559 private int getRecordCount() throws SQLException {
1560 try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1561 ResultSet result = stmt.executeQuery()) {
1563 if (result.next()) {
1564 return result.getInt(1);
1573 * Determines if there is a record for the given resource whose expiration time is in
1574 * the expected range.
1576 * @param resourceId ID of the resource of interest
1577 * @param uuidString UUID string of the owner
1578 * @param holdSec seconds for which the lock was to be held
1579 * @param tbegin earliest time, in milliseconds, at which the record could have been
1580 * inserted into the DB
1581 * @return {@code true} if a record is found, {@code false} otherwise
1582 * @throws SQLException if an error occurs accessing the DB
1584 private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1585 try (PreparedStatement stmt =
1586 conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1587 + " WHERE resourceId=? AND host=? AND owner=?")) {
1589 stmt.setString(1, resourceId);
1590 stmt.setString(2, feature.getHostName());
1591 stmt.setString(3, uuidString);
1593 try (ResultSet result = stmt.executeQuery()) {
1594 if (result.next()) {
1595 int remaining = result.getInt(1);
1596 long maxDiff = System.currentTimeMillis() - tbegin;
1597 return (remaining >= 0 && holdSec - remaining <= maxDiff);
1607 * Inserts a record into the DB.
1609 * @param resourceId ID of the resource of interest
1610 * @param uuidString UUID string of the owner
1611 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1612 * @throws SQLException if an error occurs accessing the DB
1614 private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1615 this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset);
1618 private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1619 throws SQLException {
1620 try (PreparedStatement stmt =
1621 conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1622 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1624 stmt.setString(1, resourceId);
1625 stmt.setString(2, hostName);
1626 stmt.setString(3, uuidString);
1627 stmt.setInt(4, expireOffset);
1629 assertEquals(1, stmt.executeUpdate());
1634 * Updates a record in the DB.
1636 * @param resourceId ID of the resource of interest
1637 * @param newUuid UUID string of the <i>new</i> owner
1638 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1639 * @throws SQLException if an error occurs accessing the DB
1641 private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1642 try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1643 + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1645 stmt.setString(1, newHost);
1646 stmt.setString(2, newUuid);
1647 stmt.setInt(3, expireOffset);
1648 stmt.setString(4, resourceId);
1650 assertEquals(1, stmt.executeUpdate());
1655 * Feature that uses <i>exsvc</i> to execute requests.
1657 private class MyLockingFeature extends DistributedLockManager {
1659 public MyLockingFeature(boolean init) {
1662 exsvc = mock(ScheduledExecutorService.class);
1663 when(engine.getExecutorService()).thenReturn(exsvc);
1666 beforeCreateLockManager(engine, new Properties());
1673 * Feature whose data source all throws exceptions.
1675 private class InvalidDbLockingFeature extends MyLockingFeature {
1676 private int errcode;
1677 private boolean freeLock = false;
1679 public InvalidDbLockingFeature(int errcode) {
1680 // pass "false" because we have to set the error code BEFORE calling
1684 this.errcode = errcode;
1686 this.beforeCreateLockManager(engine, new Properties());
1687 this.afterStart(engine);
1691 protected BasicDataSource makeDataSource() throws Exception {
1692 when(datasrc.getConnection()).thenAnswer(answer -> {
1698 throw new SQLException(EXPECTED_EXCEPTION, "", errcode);
1701 doThrow(new SQLException(EXPECTED_EXCEPTION, "", errcode)).when(datasrc).close();
1708 * Feature whose locks free themselves while free() is already running.
1710 private class FreeWithFreeLockingFeature extends MyLockingFeature {
1711 private boolean relock;
1713 public FreeWithFreeLockingFeature(boolean relock) {
1715 this.relock = relock;
1719 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1720 LockCallback callback) {
1722 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1723 private static final long serialVersionUID = 1L;
1724 private boolean checked = false;
1727 public boolean isUnavailable() {
1729 return super.isUnavailable();
1734 // release and relock
1742 createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1752 * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
1753 * extend it, and then unlock it.
1755 private class MyThread extends Thread {
1756 AssertionError err = null;
1765 for (int x = 0; x < MAX_LOOPS; ++x) {
1769 } catch (AssertionError e) {
1774 private void makeAttempt() {
1776 Semaphore sem = new Semaphore(0);
1778 LockCallback cb = new LockCallback() {
1780 public void lockAvailable(Lock lock) {
1785 public void lockUnavailable(Lock lock) {
1790 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1792 // wait for callback, whether available or unavailable
1793 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1794 if (!lock.isActive()) {
1798 nsuccesses.incrementAndGet();
1800 assertEquals(1, nactive.incrementAndGet());
1802 lock.extend(HOLD_SEC2, cb);
1803 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1804 assertTrue(lock.isActive());
1806 // decrement BEFORE free()
1807 nactive.decrementAndGet();
1809 assertTrue(lock.free());
1810 assertTrue(lock.isUnavailable());
1812 } catch (InterruptedException e) {
1813 Thread.currentThread().interrupt();
1814 throw new AssertionError("interrupted", e);