2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2022 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023-2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.distributed.locking;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
27 import static org.assertj.core.api.Assertions.assertThatThrownBy;
28 import static org.junit.jupiter.api.Assertions.assertEquals;
29 import static org.junit.jupiter.api.Assertions.assertFalse;
30 import static org.junit.jupiter.api.Assertions.assertNotNull;
31 import static org.junit.jupiter.api.Assertions.assertNotSame;
32 import static org.junit.jupiter.api.Assertions.assertNull;
33 import static org.junit.jupiter.api.Assertions.assertSame;
34 import static org.junit.jupiter.api.Assertions.assertTrue;
35 import static org.mockito.ArgumentMatchers.any;
36 import static org.mockito.ArgumentMatchers.anyBoolean;
37 import static org.mockito.ArgumentMatchers.anyLong;
38 import static org.mockito.ArgumentMatchers.eq;
39 import static org.mockito.Mockito.doThrow;
40 import static org.mockito.Mockito.lenient;
41 import static org.mockito.Mockito.mock;
42 import static org.mockito.Mockito.never;
43 import static org.mockito.Mockito.times;
44 import static org.mockito.Mockito.verify;
45 import static org.mockito.Mockito.when;
47 import java.io.ByteArrayInputStream;
48 import java.io.ByteArrayOutputStream;
49 import java.io.ObjectInputStream;
50 import java.io.ObjectOutputStream;
51 import java.io.Serial;
52 import java.sql.Connection;
53 import java.sql.DriverManager;
54 import java.sql.PreparedStatement;
55 import java.sql.ResultSet;
56 import java.sql.SQLException;
57 import java.sql.SQLTransientException;
58 import java.util.ArrayList;
59 import java.util.List;
60 import java.util.Properties;
61 import java.util.concurrent.Executors;
62 import java.util.concurrent.RejectedExecutionException;
63 import java.util.concurrent.ScheduledExecutorService;
64 import java.util.concurrent.ScheduledFuture;
65 import java.util.concurrent.Semaphore;
66 import java.util.concurrent.TimeUnit;
67 import java.util.concurrent.atomic.AtomicBoolean;
68 import java.util.concurrent.atomic.AtomicInteger;
69 import java.util.concurrent.atomic.AtomicReference;
70 import org.apache.commons.dbcp2.BasicDataSource;
71 import org.junit.jupiter.api.AfterAll;
72 import org.junit.jupiter.api.AfterEach;
73 import org.junit.jupiter.api.BeforeAll;
74 import org.junit.jupiter.api.BeforeEach;
75 import org.junit.jupiter.api.Test;
76 import org.junit.jupiter.api.extension.ExtendWith;
77 import org.kie.api.runtime.KieSession;
78 import org.mockito.ArgumentCaptor;
79 import org.mockito.Mock;
80 import org.mockito.MockitoAnnotations;
81 import org.mockito.junit.jupiter.MockitoExtension;
82 import org.onap.policy.common.utils.services.OrderedServiceImpl;
83 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
84 import org.onap.policy.drools.core.PolicySession;
85 import org.onap.policy.drools.core.lock.Lock;
86 import org.onap.policy.drools.core.lock.LockCallback;
87 import org.onap.policy.drools.core.lock.LockState;
88 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
89 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
90 import org.onap.policy.drools.system.PolicyEngine;
91 import org.onap.policy.drools.system.PolicyEngineConstants;
92 import org.springframework.test.util.ReflectionTestUtils;
94 @ExtendWith(MockitoExtension.class)
95 class DistributedLockManagerTest {
96 private static final long EXPIRE_SEC = 900L;
97 private static final long RETRY_SEC = 60L;
98 private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
99 private static final String OTHER_HOST = "other-host";
100 private static final String OTHER_OWNER = "other-owner";
101 private static final String EXPECTED_EXCEPTION = "expected exception";
102 private static final String DB_CONNECTION =
103 "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
104 private static final String DB_USER = "user";
105 private static final String DB_PASSWORD = "password";
106 private static final String OWNER_KEY = "my key";
107 private static final String RESOURCE = "my resource";
108 private static final String RESOURCE2 = "my resource #2";
109 private static final String RESOURCE3 = "my resource #3";
110 private static final String RESOURCE4 = "my resource #4";
111 private static final String RESOURCE5 = "my resource #5";
112 private static final int HOLD_SEC = 100;
113 private static final int HOLD_SEC2 = 120;
114 private static final int MAX_THREADS = 5;
115 private static final int MAX_LOOPS = 100;
116 private static final boolean TRANSIENT = true;
117 private static final boolean PERMANENT = false;
119 // number of execute() calls before the first lock attempt
120 private static final int PRE_LOCK_EXECS = 1;
122 // number of execute() calls before the first schedule attempt
123 private static final int PRE_SCHED_EXECS = 1;
125 private static Connection conn = null;
126 private static ScheduledExecutorService saveExec;
127 private static ScheduledExecutorService realExec;
130 private PolicyEngine engine;
133 private KieSession kieSess;
136 private ScheduledExecutorService exsvc;
139 private ScheduledFuture<?> checker;
142 private LockCallback callback;
145 private BasicDataSource datasrc;
147 private DistributedLock lock;
149 private AtomicInteger nactive;
150 private AtomicInteger nsuccesses;
151 private DistributedLockManager feature;
153 AutoCloseable closeable;
156 * Configures the location of the property files and creates the DB.
158 * @throws SQLException if the DB cannot be created
161 static void setUpBeforeClass() throws SQLException {
162 SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
163 PolicyEngineConstants.getManager().configure(new Properties());
165 conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
167 try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
168 + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
169 + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
170 createStmt.executeUpdate();
173 saveExec = (ScheduledExecutorService) ReflectionTestUtils.getField(PolicyEngineConstants.getManager(),
174 POLICY_ENGINE_EXECUTOR_FIELD);
176 realExec = Executors.newScheduledThreadPool(3);
180 * Restores static fields.
183 static void tearDownAfterClass() throws SQLException {
184 ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
190 * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
193 * @throws SQLException if the lock records cannot be deleted from the DB
196 void setUp() throws SQLException {
197 closeable = MockitoAnnotations.openMocks(this);
198 // grant() and deny() calls will come through here and be immediately executed
199 PolicySession session = new PolicySession(null, null, kieSess) {
201 public void insertDrools(Object object) {
202 ((Runnable) object).run();
206 session.setPolicySession();
208 nactive = new AtomicInteger(0);
209 nsuccesses = new AtomicInteger(0);
213 feature = new MyLockingFeature(true);
217 void tearDown() throws Exception {
223 private void cleanDb() throws SQLException {
224 try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
225 stmt.executeUpdate();
229 private void shutdownFeature() {
230 if (feature != null) {
231 feature.afterStop(engine);
237 * Tests that the feature is found in the expected service sets.
240 void testServiceApis() {
241 assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
242 .anyMatch(obj -> obj instanceof DistributedLockManager));
246 void testGetSequenceNumber() {
247 assertEquals(1000, feature.getSequenceNumber());
251 void testBeforeCreateLockManager() {
252 assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
256 * Tests beforeCreate(), when getProperties() throws a runtime exception.
259 void testBeforeCreateLockManagerEx() {
262 feature = new MyLockingFeature(false) {
264 protected Properties getProperties(String fileName) {
265 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
269 Properties props = new Properties();
270 assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, props))
271 .isInstanceOf(DistributedLockManagerException.class);
275 void testAfterStart() {
276 // verify that cleanup & expire check are both added to the queue
277 verify(exsvc).execute(any());
278 verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
282 * Tests afterStart(), when thread pool throws a runtime exception.
285 void testAfterStartExInThreadPool() {
288 feature = new MyLockingFeature(false);
290 doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
292 assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
296 void testDeleteExpiredDbLocks() throws SQLException {
297 // add records: two expired, one not
298 insertRecord(RESOURCE, feature.getUuidString(), -1);
299 insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
300 insertRecord(RESOURCE3, OTHER_OWNER, 0);
301 insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
303 // get the clean-up function and execute it
304 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
305 verify(exsvc).execute(captor.capture());
307 long tbegin = System.currentTimeMillis();
308 Runnable action = captor.getValue();
311 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
312 assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
313 assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
314 assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
316 assertEquals(2, getRecordCount());
320 * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
324 void testDeleteExpiredDbLocksEx() {
325 feature = new InvalidDbLockingFeature(TRANSIENT);
327 // get the clean-up function and execute it
328 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
329 verify(exsvc).execute(captor.capture());
331 Runnable action = captor.getValue();
333 // should not throw an exception
338 void testAfterStop() {
340 verify(checker).cancel(anyBoolean());
342 feature = new DistributedLockManager();
344 // shutdown without calling afterStart()
350 * Tests afterStop(), when the data source throws an exception when close() is called.
354 void testAfterStopEx() {
357 // use a data source that throws an exception when closed
358 feature = new InvalidDbLockingFeature(TRANSIENT);
360 assertThatCode(this::shutdownFeature).doesNotThrowAnyException();
364 void testCreateLock() throws SQLException {
365 verify(exsvc).execute(any());
367 lock = getLock(RESOURCE, callback);
368 assertTrue(lock.isWaiting());
370 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
372 // this lock should fail
373 LockCallback callback2 = mock(LockCallback.class);
374 DistributedLock lock2 = getLock(RESOURCE, callback2);
375 assertTrue(lock2.isUnavailable());
376 verify(callback2, never()).lockAvailable(lock2);
377 verify(callback2).lockUnavailable(lock2);
379 // this should fail, too
380 LockCallback callback3 = mock(LockCallback.class);
381 DistributedLock lock3 = getLock(RESOURCE, callback3);
382 assertTrue(lock3.isUnavailable());
383 verify(callback3, never()).lockAvailable(lock3);
384 verify(callback3).lockUnavailable(lock3);
386 // no change to first
387 assertTrue(lock.isWaiting());
389 // no callbacks to the first lock
390 verify(callback, never()).lockAvailable(lock);
391 verify(callback, never()).lockUnavailable(lock);
393 assertTrue(lock.isWaiting());
394 assertEquals(0, getRecordCount());
397 assertTrue(lock.isActive());
398 assertEquals(1, getRecordCount());
400 verify(callback).lockAvailable(lock);
401 verify(callback, never()).lockUnavailable(lock);
403 // this should succeed
404 DistributedLock lock4 = getLock(RESOURCE2, callback);
405 assertTrue(lock4.isWaiting());
407 // after running checker, original records should still remain
408 runChecker(0, EXPIRE_SEC);
409 assertEquals(1, getRecordCount());
410 verify(callback, never()).lockUnavailable(lock);
414 * Tests createLock() when the feature is not the latest instance.
417 void testCreateLockNotLatestInstance() {
418 DistributedLockManager.setLatestInstance(null);
420 Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
421 assertTrue(lock.isUnavailable());
422 verify(callback, never()).lockAvailable(any());
423 verify(callback).lockUnavailable(lock);
427 void testCheckExpired() throws SQLException {
428 lock = getLock(RESOURCE, callback);
431 LockCallback callback2 = mock(LockCallback.class);
432 final DistributedLock lock2 = getLock(RESOURCE2, callback2);
435 LockCallback callback3 = mock(LockCallback.class);
436 final DistributedLock lock3 = getLock(RESOURCE3, callback3);
439 LockCallback callback4 = mock(LockCallback.class);
440 final DistributedLock lock4 = getLock(RESOURCE4, callback4);
443 LockCallback callback5 = mock(LockCallback.class);
444 final DistributedLock lock5 = getLock(RESOURCE5, callback5);
447 assertEquals(5, getRecordCount());
450 updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
452 // change host of another record
453 updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
455 // change uuid of another record
456 updateRecord(RESOURCE5, feature.getPdpName(), OTHER_OWNER, HOLD_SEC);
459 runChecker(0, EXPIRE_SEC);
462 assertTrue(lock.isUnavailable());
463 assertTrue(lock2.isActive());
464 assertTrue(lock3.isUnavailable());
465 assertTrue(lock4.isActive());
466 assertTrue(lock5.isUnavailable());
472 verify(callback).lockUnavailable(lock);
473 verify(callback3).lockUnavailable(lock3);
474 verify(callback5).lockUnavailable(lock5);
476 verify(callback2, never()).lockUnavailable(lock2);
477 verify(callback4, never()).lockUnavailable(lock4);
479 // another check should have been scheduled, with the normal interval
480 runChecker(1, EXPIRE_SEC);
484 * Tests checkExpired(), when schedule() throws an exception.
487 void testCheckExpiredExecRejected() {
488 // arrange for execution to be rejected
489 when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
490 .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
492 runChecker(0, EXPIRE_SEC);
496 * Tests checkExpired(), when getConnection() throws an exception.
499 void testCheckExpiredSqlEx() {
500 // use a data source that throws an exception when getConnection() is called
501 feature = new InvalidDbLockingFeature(TRANSIENT);
503 runChecker(0, EXPIRE_SEC);
505 // it should have scheduled another check, sooner
506 runChecker(0, RETRY_SEC);
510 * Tests checkExpired(), when getConnection() throws an exception and the feature is
514 void testCheckExpiredSqlExFeatureStopped() {
515 // use a data source that throws an exception when getConnection() is called
516 feature = new InvalidDbLockingFeature(TRANSIENT) {
518 protected SQLException makeEx() {
520 return super.makeEx();
524 runChecker(0, EXPIRE_SEC);
526 // it should NOT have scheduled another check
527 verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
531 void testExpireLocks() throws SQLException {
532 AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
534 feature = new MyLockingFeature(true) {
536 protected BasicDataSource makeDataSource() throws Exception {
537 // get the real data source
538 BasicDataSource src2 = super.makeDataSource();
540 when(datasrc.getConnection()).thenAnswer(answer -> {
541 DistributedLock lck = freeLock.getAndSet(null);
550 return src2.getConnection();
557 lock = getLock(RESOURCE, callback);
560 LockCallback callback2 = mock(LockCallback.class);
561 final DistributedLock lock2 = getLock(RESOURCE2, callback2);
564 LockCallback callback3 = mock(LockCallback.class);
565 final DistributedLock lock3 = getLock(RESOURCE3, callback3);
566 // don't run doLock for lock3 - leave it in the waiting state
568 LockCallback callback4 = mock(LockCallback.class);
569 final DistributedLock lock4 = getLock(RESOURCE4, callback4);
572 assertEquals(3, getRecordCount());
575 updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
577 // arrange to free lock4 while the checker is running
581 runChecker(0, EXPIRE_SEC);
584 assertTrue(lock.isUnavailable());
585 assertTrue(lock2.isActive());
586 assertTrue(lock3.isWaiting());
587 assertTrue(lock4.isUnavailable());
590 verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
592 verify(callback).lockUnavailable(lock);
593 verify(callback2, never()).lockUnavailable(lock2);
594 verify(callback3, never()).lockUnavailable(lock3);
595 verify(callback4, never()).lockUnavailable(lock4);
599 void testDistributedLockNoArgs() {
600 DistributedLock lock = new DistributedLock();
601 assertNull(lock.getResourceId());
602 assertNull(lock.getOwnerKey());
603 assertNull(lock.getCallback());
604 assertEquals(0, lock.getHoldSec());
608 void testDistributedLock() {
609 assertThatIllegalArgumentException()
610 .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
611 .withMessageContaining("holdSec");
613 // should generate no exception
614 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
618 void testDistributedLockSerializable() throws Exception {
619 DistributedLock lock = getLock(RESOURCE, callback);
620 lock = roundTrip(lock);
622 assertTrue(lock.isWaiting());
624 assertEquals(RESOURCE, lock.getResourceId());
625 assertEquals(OWNER_KEY, lock.getOwnerKey());
626 assertNull(lock.getCallback());
627 assertEquals(HOLD_SEC, lock.getHoldSec());
632 lock = getLock(RESOURCE, callback);
633 assertFalse(lock.isActive());
635 // execute the doLock() call
638 assertTrue(lock.isActive());
640 // the callback for the lock should have been run in the foreground thread
641 verify(callback).lockAvailable(lock);
645 void testDistributedLockDeny() {
647 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
649 // get another lock - should fail
650 lock = getLock(RESOURCE, callback);
652 assertTrue(lock.isUnavailable());
654 // the callback for the second lock should have been run in the foreground thread
655 verify(callback).lockUnavailable(lock);
657 // should only have a request for the first lock
658 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
662 void testDistributedLockFree() {
663 lock = getLock(RESOURCE, callback);
665 assertTrue(lock.free());
666 assertTrue(lock.isUnavailable());
668 // run both requests associated with the lock
672 // should not have changed state
673 assertTrue(lock.isUnavailable());
675 // attempt to free it again
676 assertFalse(lock.free());
678 // should not have queued anything else
679 verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
681 // new lock should succeed
682 DistributedLock lock2 = getLock(RESOURCE, callback);
683 assertNotSame(lock2, lock);
684 assertTrue(lock2.isWaiting());
688 * Tests that free() works on a serialized lock with a new feature.
690 * @throws Exception if an error occurs
693 void testDistributedLockFreeSerialized() throws Exception {
694 DistributedLock lock = getLock(RESOURCE, callback);
696 feature = new MyLockingFeature(true);
698 lock = roundTrip(lock);
699 assertTrue(lock.free());
700 assertTrue(lock.isUnavailable());
704 * Tests free() on a serialized lock without a feature.
706 * @throws Exception if an error occurs
709 void testDistributedLockFreeNoFeature() throws Exception {
710 DistributedLock lock = getLock(RESOURCE, callback);
712 DistributedLockManager.setLatestInstance(null);
714 lock = roundTrip(lock);
715 assertFalse(lock.free());
716 assertTrue(lock.isUnavailable());
720 * Tests the case where the lock is freed and doUnlock called between the call to
721 * isUnavailable() and the call to compute().
724 void testDistributedLockFreeUnlocked() {
725 feature = new FreeWithFreeLockingFeature(true);
727 lock = getLock(RESOURCE, callback);
729 assertFalse(lock.free());
730 assertTrue(lock.isUnavailable());
734 * Tests the case where the lock is freed, but doUnlock is not completed, between the
735 * call to isUnavailable() and the call to compute().
738 void testDistributedLockFreeLockFreed() {
739 feature = new FreeWithFreeLockingFeature(false);
741 lock = getLock(RESOURCE, callback);
743 assertFalse(lock.free());
744 assertTrue(lock.isUnavailable());
748 void testDistributedLockExtend() {
749 lock = getLock(RESOURCE, callback);
751 // lock2 should be denied - called back by this thread
752 DistributedLock lock2 = getLock(RESOURCE, callback);
753 verify(callback, never()).lockAvailable(lock2);
754 verify(callback).lockUnavailable(lock2);
756 // lock2 will still be denied - called back by this thread
757 lock2.extend(HOLD_SEC, callback);
758 verify(callback, times(2)).lockUnavailable(lock2);
760 // force lock2 to be active - should still be denied
761 ReflectionTestUtils.setField(lock2, "state", LockState.ACTIVE);
762 lock2.extend(HOLD_SEC, callback);
763 verify(callback, times(3)).lockUnavailable(lock2);
765 assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
766 .withMessageContaining("holdSec");
770 assertTrue(lock.isActive());
772 // now extend the first lock
773 LockCallback callback2 = mock(LockCallback.class);
774 lock.extend(HOLD_SEC2, callback2);
775 assertTrue(lock.isWaiting());
777 // execute doExtend()
779 lock.extend(HOLD_SEC2, callback2);
780 assertEquals(HOLD_SEC2, lock.getHoldSec());
781 verify(callback2).lockAvailable(lock);
782 verify(callback2, never()).lockUnavailable(lock);
786 * Tests that extend() works on a serialized lock with a new feature.
788 * @throws Exception if an error occurs
791 void testDistributedLockExtendSerialized() throws Exception {
792 DistributedLock lock = getLock(RESOURCE, callback);
796 assertTrue(lock.isActive());
798 feature = new MyLockingFeature(true);
800 lock = roundTrip(lock);
801 assertTrue(lock.isActive());
803 LockCallback scallback = mock(LockCallback.class);
805 lock.extend(HOLD_SEC, scallback);
806 assertTrue(lock.isWaiting());
808 // run doExtend (in new feature)
810 assertTrue(lock.isActive());
812 verify(scallback).lockAvailable(lock);
813 verify(scallback, never()).lockUnavailable(lock);
817 * Tests extend() on a serialized lock without a feature.
819 * @throws Exception if an error occurs
822 void testDistributedLockExtendNoFeature() throws Exception {
823 DistributedLock lock = getLock(RESOURCE, callback);
827 assertTrue(lock.isActive());
829 DistributedLockManager.setLatestInstance(null);
831 lock = roundTrip(lock);
832 assertTrue(lock.isActive());
834 LockCallback scallback = mock(LockCallback.class);
836 lock.extend(HOLD_SEC, scallback);
837 assertTrue(lock.isUnavailable());
839 verify(scallback, never()).lockAvailable(lock);
840 verify(scallback).lockUnavailable(lock);
844 * Tests the case where the lock is freed and doUnlock called between the call to
845 * isUnavailable() and the call to compute().
848 void testDistributedLockExtendUnlocked() {
849 feature = new FreeWithFreeLockingFeature(true);
851 lock = getLock(RESOURCE, callback);
853 lock.extend(HOLD_SEC2, callback);
854 assertTrue(lock.isUnavailable());
855 verify(callback).lockUnavailable(lock);
859 * Tests the case where the lock is freed, but doUnlock is not completed, between the
860 * call to isUnavailable() and the call to compute().
863 void testDistributedLockExtendLockFreed() {
864 feature = new FreeWithFreeLockingFeature(false);
866 lock = getLock(RESOURCE, callback);
868 lock.extend(HOLD_SEC2, callback);
869 assertTrue(lock.isUnavailable());
870 verify(callback).lockUnavailable(lock);
874 void testDistributedLockScheduleRequest() {
875 lock = getLock(RESOURCE, callback);
878 verify(callback).lockAvailable(lock);
882 void testDistributedLockRescheduleRequest() {
883 // use a data source that throws an exception when getConnection() is called
884 InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
887 lock = getLock(RESOURCE, callback);
889 // invoke doLock - should fail and reschedule
892 // should still be waiting
893 assertTrue(lock.isWaiting());
894 verify(callback, never()).lockUnavailable(lock);
896 // free the lock while doLock is executing
897 invfeat.freeLock = true;
899 // try scheduled request - should just invoke doUnlock
902 // should still be waiting
903 assertTrue(lock.isUnavailable());
904 verify(callback, never()).lockUnavailable(lock);
906 // should have scheduled a retry of doUnlock
907 verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
911 void testDistributedLockGetNextRequest() {
912 lock = getLock(RESOURCE, callback);
915 * run doLock. This should cause getNextRequest() to be called twice, once with a
916 * request in the queue, and the second time with request=null.
922 * Tests getNextRequest(), where the same request is still in the queue the second
926 void testDistributedLockGetNextRequestSameRequest() {
927 // force reschedule to be invoked
928 feature = new InvalidDbLockingFeature(TRANSIENT);
930 lock = getLock(RESOURCE, callback);
933 * run doLock. This should cause getNextRequest() to be called twice, once with a
934 * request in the queue, and the second time with the same request again.
938 verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
942 void testDistributedLockDoRequest() {
943 lock = getLock(RESOURCE, callback);
945 assertTrue(lock.isWaiting());
947 // run doLock via doRequest
950 assertTrue(lock.isActive());
954 * Tests doRequest(), when doRequest() is already running within another thread.
957 void testDistributedLockDoRequestBusy() {
959 * this feature will invoke a request in a background thread while it's being run
960 * in a foreground thread.
962 AtomicBoolean running = new AtomicBoolean(false);
963 AtomicBoolean returned = new AtomicBoolean(false);
965 feature = new MyLockingFeature(true) {
967 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
968 LockCallback callback) {
969 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
971 private static final long serialVersionUID = 1L;
974 protected boolean doDbInsert(Connection conn) throws SQLException {
976 // already inside the thread - don't recurse any further
977 return super.doDbInsert(conn);
982 Thread thread = new Thread(() -> {
983 // run doLock from within the new thread
986 thread.setDaemon(true);
989 // wait for the background thread to complete before continuing
992 } catch (InterruptedException ignore) {
993 Thread.currentThread().interrupt();
996 returned.set(!thread.isAlive());
998 return super.doDbInsert(conn);
1004 lock = getLock(RESOURCE, callback);
1009 assertTrue(returned.get());
1013 * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1015 * @throws SQLException if an error occurs
1018 void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1019 // throw run-time exception
1020 when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1022 // use a data source that throws an exception when getConnection() is called
1023 feature = new MyLockingFeature(true) {
1025 protected BasicDataSource makeDataSource() {
1030 lock = getLock(RESOURCE, callback);
1032 // invoke doLock - should NOT reschedule
1035 assertTrue(lock.isUnavailable());
1036 verify(callback).lockUnavailable(lock);
1038 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1042 * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1045 * @throws SQLException if an error occurs
1048 void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1049 // throw run-time exception
1050 when(datasrc.getConnection()).thenAnswer(answer -> {
1052 throw new IllegalStateException(EXPECTED_EXCEPTION);
1055 // use a data source that throws an exception when getConnection() is called
1056 feature = new MyLockingFeature(true) {
1058 protected BasicDataSource makeDataSource() {
1063 lock = getLock(RESOURCE, callback);
1065 // invoke doLock - should NOT reschedule
1068 assertTrue(lock.isUnavailable());
1069 verify(callback, never()).lockUnavailable(lock);
1071 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1075 * Tests doRequest() when the retry count gets exhausted.
1078 void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1079 // use a data source that throws an exception when getConnection() is called
1080 feature = new InvalidDbLockingFeature(TRANSIENT);
1082 lock = getLock(RESOURCE, callback);
1084 // invoke doLock - should fail and reschedule
1087 // should still be waiting
1088 assertTrue(lock.isWaiting());
1089 verify(callback, never()).lockUnavailable(lock);
1091 // try again, via SCHEDULER - first retry fails
1094 // should still be waiting
1095 assertTrue(lock.isWaiting());
1096 verify(callback, never()).lockUnavailable(lock);
1098 // try again, via SCHEDULER - final retry fails
1100 assertTrue(lock.isUnavailable());
1102 // now callback should have been called
1103 verify(callback).lockUnavailable(lock);
1107 * Tests doRequest() when a non-transient DB exception is thrown.
1110 void testDistributedLockDoRequestNotTransient() {
1112 * use a data source that throws a PERMANENT exception when getConnection() is
1115 feature = new InvalidDbLockingFeature(PERMANENT);
1117 lock = getLock(RESOURCE, callback);
1119 // invoke doLock - should fail
1122 assertTrue(lock.isUnavailable());
1123 verify(callback).lockUnavailable(lock);
1125 // should not have scheduled anything new
1126 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1127 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1131 void testDistributedLockDoLock() throws SQLException {
1132 lock = getLock(RESOURCE, callback);
1134 // invoke doLock - should simply do an insert
1135 long tbegin = System.currentTimeMillis();
1138 assertEquals(1, getRecordCount());
1139 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1140 verify(callback).lockAvailable(lock);
1144 * Tests doLock() when the lock is freed before doLock runs.
1146 * @throws SQLException if an error occurs
1149 void testDistributedLockDoLockFreed() throws SQLException {
1150 lock = getLock(RESOURCE, callback);
1152 lock.setState(LockState.UNAVAILABLE);
1154 // invoke doLock - should do nothing
1157 assertEquals(0, getRecordCount());
1159 verify(callback, never()).lockAvailable(lock);
1163 * Tests doLock() when a DB exception is thrown.
1166 void testDistributedLockDoLockEx() {
1167 // use a data source that throws an exception when getConnection() is called
1168 feature = new InvalidDbLockingFeature(PERMANENT);
1170 lock = getLock(RESOURCE, callback);
1172 // invoke doLock - should simply do an insert
1175 // lock should have failed due to exception
1176 verify(callback).lockUnavailable(lock);
1180 * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1184 void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1185 // insert an expired record
1186 insertRecord(RESOURCE, feature.getUuidString(), 0);
1188 lock = getLock(RESOURCE, callback);
1190 // invoke doLock - should simply do an update
1192 verify(callback).lockAvailable(lock);
1196 * Tests doLock() when a locked record already exists.
1199 void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1200 // insert an expired record
1201 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1203 lock = getLock(RESOURCE, callback);
1208 // lock should have failed because it's already locked
1209 verify(callback).lockUnavailable(lock);
1213 void testDistributedLockDoUnlock() throws SQLException {
1214 lock = getLock(RESOURCE, callback);
1221 // invoke doUnlock()
1222 long tbegin = System.currentTimeMillis();
1225 assertEquals(0, getRecordCount());
1226 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1228 assertTrue(lock.isUnavailable());
1230 // no more callbacks should have occurred
1231 verify(callback, times(1)).lockAvailable(lock);
1232 verify(callback, never()).lockUnavailable(lock);
1236 * Tests doUnlock() when a DB exception is thrown.
1240 void testDistributedLockDoUnlockEx() {
1241 feature = new InvalidDbLockingFeature(PERMANENT);
1243 lock = getLock(RESOURCE, callback);
1245 // do NOT invoke doLock() - it will fail without a DB connection
1249 // invoke doUnlock()
1252 assertTrue(lock.isUnavailable());
1254 // no more callbacks should have occurred
1255 verify(callback, never()).lockAvailable(lock);
1256 verify(callback, never()).lockUnavailable(lock);
1260 void testDistributedLockDoExtend() throws SQLException {
1261 lock = getLock(RESOURCE, callback);
1264 LockCallback callback2 = mock(LockCallback.class);
1265 lock.extend(HOLD_SEC2, callback2);
1268 long tbegin = System.currentTimeMillis();
1271 assertEquals(1, getRecordCount());
1272 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1274 assertTrue(lock.isActive());
1276 // no more callbacks should have occurred
1277 verify(callback).lockAvailable(lock);
1278 verify(callback, never()).lockUnavailable(lock);
1280 // extension should have succeeded
1281 verify(callback2).lockAvailable(lock);
1282 verify(callback2, never()).lockUnavailable(lock);
1286 * Tests doExtend() when the lock is freed before doExtend runs.
1288 * @throws SQLException if an error occurs
1291 void testDistributedLockDoExtendFreed() throws SQLException {
1292 lock = getLock(RESOURCE, callback);
1293 lock.extend(HOLD_SEC2, callback);
1295 lock.setState(LockState.UNAVAILABLE);
1297 // invoke doExtend - should do nothing
1300 assertEquals(0, getRecordCount());
1302 verify(callback, never()).lockAvailable(lock);
1306 * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1309 * @throws SQLException if an error occurs
1312 void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1313 lock = getLock(RESOURCE, callback);
1316 LockCallback callback2 = mock(LockCallback.class);
1317 lock.extend(HOLD_SEC2, callback2);
1319 // delete the record so it's forced to re-insert it
1323 long tbegin = System.currentTimeMillis();
1326 assertEquals(1, getRecordCount());
1327 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1329 assertTrue(lock.isActive());
1331 // no more callbacks should have occurred
1332 verify(callback).lockAvailable(lock);
1333 verify(callback, never()).lockUnavailable(lock);
1335 // extension should have succeeded
1336 verify(callback2).lockAvailable(lock);
1337 verify(callback2, never()).lockUnavailable(lock);
1341 * Tests doExtend() when both update and insert fail.
1345 void testDistributedLockDoExtendNeitherSucceeds() {
1347 * this feature will create a lock that returns false when doDbUpdate() is
1348 * invoked, or when doDbInsert() is invoked a second time
1350 feature = new MyLockingFeature(true) {
1352 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1353 LockCallback callback) {
1354 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1356 private static final long serialVersionUID = 1L;
1357 private int ntimes = 0;
1360 protected boolean doDbInsert(Connection conn) throws SQLException {
1365 return super.doDbInsert(conn);
1369 protected boolean doDbUpdate(Connection conn) {
1376 lock = getLock(RESOURCE, callback);
1379 LockCallback callback2 = mock(LockCallback.class);
1380 lock.extend(HOLD_SEC2, callback2);
1385 assertTrue(lock.isUnavailable());
1387 // no more callbacks should have occurred
1388 verify(callback).lockAvailable(lock);
1389 verify(callback, never()).lockUnavailable(lock);
1391 // extension should have failed
1392 verify(callback2, never()).lockAvailable(lock);
1393 verify(callback2).lockUnavailable(lock);
1397 * Tests doExtend() when an exception occurs.
1399 * @throws SQLException if an error occurs
1402 void testDistributedLockDoExtendEx() throws SQLException {
1403 lock = getLock(RESOURCE, callback);
1407 * delete the record and insert one with a different owner, which will cause
1408 * doDbInsert() to throw an exception
1411 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1413 LockCallback callback2 = mock(LockCallback.class);
1414 lock.extend(HOLD_SEC2, callback2);
1419 assertTrue(lock.isUnavailable());
1421 // no more callbacks should have occurred
1422 verify(callback).lockAvailable(lock);
1423 verify(callback, never()).lockUnavailable(lock);
1425 // extension should have failed
1426 verify(callback2, never()).lockAvailable(lock);
1427 verify(callback2).lockUnavailable(lock);
1431 void testDistributedLockToString() {
1432 String text = getLock(RESOURCE, callback).toString();
1433 assertNotNull(text);
1434 assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1438 void testMakeThreadPool() {
1439 // use a REAL feature to test this
1440 feature = new DistributedLockManager();
1442 // this should create a thread pool
1443 feature.beforeCreateLockManager(engine, new Properties());
1444 feature.afterStart(engine);
1446 assertThatCode(this::shutdownFeature).doesNotThrowAnyException();
1450 * Performs a multithreaded test of the locking facility.
1452 * @throws InterruptedException if the current thread is interrupted while waiting for
1453 * the background threads to complete
1456 void testMultiThreaded() throws InterruptedException {
1457 ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1459 feature = new DistributedLockManager();
1460 feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1461 feature.afterStart(PolicyEngineConstants.getManager());
1463 List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1464 for (int x = 0; x < MAX_THREADS; ++x) {
1465 threads.add(new MyThread());
1468 threads.forEach(Thread::start);
1470 for (MyThread thread : threads) {
1472 assertFalse(thread.isAlive());
1475 for (MyThread thread : threads) {
1476 if (thread.err != null) {
1481 assertTrue(nsuccesses.get() > 0);
1484 private DistributedLock getLock(String resource, LockCallback callback) {
1485 return (DistributedLock) feature.createLock(resource, DistributedLockManagerTest.OWNER_KEY,
1486 DistributedLockManagerTest.HOLD_SEC, callback, false);
1489 private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1490 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1491 try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1492 oos.writeObject(lock);
1495 ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1496 try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1497 return (DistributedLock) ois.readObject();
1502 * Runs the checkExpired() action.
1504 * @param nskip number of actions in the work queue to skip
1505 * @param schedSec number of seconds for which the checker should have been scheduled
1507 private void runChecker(int nskip, long schedSec) {
1508 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1509 verify(exsvc, times(nskip + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1510 Runnable action = captor.getAllValues().get(nskip);
1515 * Runs a lock action (e.g., doLock, doUnlock).
1517 * @param nskip number of actions in the work queue to skip
1518 * @param nadditional number of additional actions that appear in the work queue
1519 * <i>after</i> the desired action
1521 void runLock(int nskip, int nadditional) {
1522 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1523 verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1525 Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1530 * Runs a scheduled action (e.g., "retry" action).
1532 * @param nskip number of actions in the work queue to skip
1534 void runSchedule(int nskip) {
1535 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1536 verify(exsvc, times(PRE_SCHED_EXECS + nskip + 1)).schedule(captor.capture(), anyLong(), any());
1538 Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1543 * Gets a count of the number of lock records in the DB.
1545 * @return the number of lock records in the DB
1546 * @throws SQLException if an error occurs accessing the DB
1548 private int getRecordCount() throws SQLException {
1549 try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1550 ResultSet result = stmt.executeQuery()) {
1552 if (result.next()) {
1553 return result.getInt(1);
1562 * Determines if there is a record for the given resource whose expiration time is in
1563 * the expected range.
1565 * @param resourceId ID of the resource of interest
1566 * @param uuidString UUID string of the owner
1567 * @param holdSec seconds for which the lock was to be held
1568 * @param tbegin earliest time, in milliseconds, at which the record could have been
1569 * inserted into the DB
1570 * @return {@code true} if a record is found, {@code false} otherwise
1571 * @throws SQLException if an error occurs accessing the DB
1573 private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1574 try (PreparedStatement stmt =
1575 conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1576 + " WHERE resourceId=? AND host=? AND owner=?")) {
1578 stmt.setString(1, resourceId);
1579 stmt.setString(2, feature.getPdpName());
1580 stmt.setString(3, uuidString);
1582 try (ResultSet result = stmt.executeQuery()) {
1583 if (result.next()) {
1584 int remaining = result.getInt(1);
1585 long maxDiff = System.currentTimeMillis() - tbegin;
1586 return (remaining >= 0 && holdSec - remaining <= maxDiff);
1596 * Inserts a record into the DB.
1598 * @param resourceId ID of the resource of interest
1599 * @param uuidString UUID string of the owner
1600 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1601 * @throws SQLException if an error occurs accessing the DB
1603 private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1604 this.insertRecord(resourceId, feature.getPdpName(), uuidString, expireOffset);
1607 private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1608 throws SQLException {
1609 try (PreparedStatement stmt =
1610 conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1611 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1613 stmt.setString(1, resourceId);
1614 stmt.setString(2, hostName);
1615 stmt.setString(3, uuidString);
1616 stmt.setInt(4, expireOffset);
1618 assertEquals(1, stmt.executeUpdate());
1623 * Updates a record in the DB.
1625 * @param resourceId ID of the resource of interest
1626 * @param newUuid UUID string of the <i>new</i> owner
1627 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1628 * @throws SQLException if an error occurs accessing the DB
1630 private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1631 try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1632 + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1634 stmt.setString(1, newHost);
1635 stmt.setString(2, newUuid);
1636 stmt.setInt(3, expireOffset);
1637 stmt.setString(4, resourceId);
1639 assertEquals(1, stmt.executeUpdate());
1644 * Feature that uses <i>exsvc</i> to execute requests.
1646 private class MyLockingFeature extends DistributedLockManager {
1648 public MyLockingFeature(boolean init) {
1651 exsvc = mock(ScheduledExecutorService.class);
1652 lenient().when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1653 ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1656 beforeCreateLockManager(engine, new Properties());
1664 * Feature whose data source all throws exceptions.
1666 private class InvalidDbLockingFeature extends MyLockingFeature {
1667 private final boolean isTransient;
1668 private boolean freeLock = false;
1670 InvalidDbLockingFeature(boolean isTransient) {
1671 // pass "false" because we have to set the error code BEFORE calling
1675 this.isTransient = isTransient;
1677 this.beforeCreateLockManager(engine, new Properties());
1679 this.afterStart(engine);
1683 protected BasicDataSource makeDataSource() throws Exception {
1684 lenient().when(datasrc.getConnection()).thenAnswer(answer -> {
1693 doThrow(makeEx()).when(datasrc).close();
1698 protected SQLException makeEx() {
1700 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1703 return new SQLException(EXPECTED_EXCEPTION);
1709 * Feature whose locks free themselves while free() is already running.
1711 private class FreeWithFreeLockingFeature extends MyLockingFeature {
1712 private final boolean relock;
1714 public FreeWithFreeLockingFeature(boolean relock) {
1716 this.relock = relock;
1720 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1721 LockCallback callback) {
1723 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1725 private static final long serialVersionUID = 1L;
1726 private boolean checked = false;
1729 public boolean isUnavailable() {
1731 return super.isUnavailable();
1736 // release and relock
1744 createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1754 * Thread used with the multithreaded test. It repeatedly attempts to get a lock,
1755 * extend it, and then unlock it.
1757 private class MyThread extends Thread {
1758 AssertionError err = null;
1767 for (int x = 0; x < MAX_LOOPS; ++x) {
1771 } catch (AssertionError e) {
1776 private void makeAttempt() {
1778 Semaphore sem = new Semaphore(0);
1780 LockCallback cb = new LockCallback() {
1782 public void lockAvailable(Lock lock) {
1787 public void lockUnavailable(Lock lock) {
1792 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1794 // wait for callback, whether available or unavailable
1795 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1796 if (!lock.isActive()) {
1800 nsuccesses.incrementAndGet();
1802 assertEquals(1, nactive.incrementAndGet());
1804 lock.extend(HOLD_SEC2, cb);
1805 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1806 assertTrue(lock.isActive());
1808 // decrement BEFORE free()
1809 nactive.decrementAndGet();
1811 assertTrue(lock.free());
1812 assertTrue(lock.isUnavailable());
1814 } catch (InterruptedException e) {
1815 Thread.currentThread().interrupt();
1816 throw new AssertionError("interrupted", e);