2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2022 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023-2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.distributed.locking;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
27 import static org.assertj.core.api.Assertions.assertThatThrownBy;
28 import static org.junit.jupiter.api.Assertions.assertEquals;
29 import static org.junit.jupiter.api.Assertions.assertFalse;
30 import static org.junit.jupiter.api.Assertions.assertNotNull;
31 import static org.junit.jupiter.api.Assertions.assertNotSame;
32 import static org.junit.jupiter.api.Assertions.assertNull;
33 import static org.junit.jupiter.api.Assertions.assertSame;
34 import static org.junit.jupiter.api.Assertions.assertTrue;
35 import static org.mockito.ArgumentMatchers.any;
36 import static org.mockito.ArgumentMatchers.anyBoolean;
37 import static org.mockito.ArgumentMatchers.anyLong;
38 import static org.mockito.ArgumentMatchers.eq;
39 import static org.mockito.Mockito.doThrow;
40 import static org.mockito.Mockito.lenient;
41 import static org.mockito.Mockito.mock;
42 import static org.mockito.Mockito.never;
43 import static org.mockito.Mockito.times;
44 import static org.mockito.Mockito.verify;
45 import static org.mockito.Mockito.when;
47 import java.io.ByteArrayInputStream;
48 import java.io.ByteArrayOutputStream;
49 import java.io.ObjectInputStream;
50 import java.io.ObjectOutputStream;
51 import java.sql.Connection;
52 import java.sql.DriverManager;
53 import java.sql.PreparedStatement;
54 import java.sql.ResultSet;
55 import java.sql.SQLException;
56 import java.sql.SQLTransientException;
57 import java.util.ArrayList;
58 import java.util.List;
59 import java.util.Properties;
60 import java.util.concurrent.Executors;
61 import java.util.concurrent.RejectedExecutionException;
62 import java.util.concurrent.ScheduledExecutorService;
63 import java.util.concurrent.ScheduledFuture;
64 import java.util.concurrent.Semaphore;
65 import java.util.concurrent.TimeUnit;
66 import java.util.concurrent.atomic.AtomicBoolean;
67 import java.util.concurrent.atomic.AtomicInteger;
68 import java.util.concurrent.atomic.AtomicReference;
69 import org.apache.commons.dbcp2.BasicDataSource;
70 import org.junit.jupiter.api.AfterAll;
71 import org.junit.jupiter.api.AfterEach;
72 import org.junit.jupiter.api.BeforeAll;
73 import org.junit.jupiter.api.BeforeEach;
74 import org.junit.jupiter.api.Test;
75 import org.junit.jupiter.api.extension.ExtendWith;
76 import org.kie.api.runtime.KieSession;
77 import org.mockito.ArgumentCaptor;
78 import org.mockito.Mock;
79 import org.mockito.MockitoAnnotations;
80 import org.mockito.junit.jupiter.MockitoExtension;
81 import org.onap.policy.common.utils.services.OrderedServiceImpl;
82 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
83 import org.onap.policy.drools.core.PolicySession;
84 import org.onap.policy.drools.core.lock.Lock;
85 import org.onap.policy.drools.core.lock.LockCallback;
86 import org.onap.policy.drools.core.lock.LockState;
87 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
88 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
89 import org.onap.policy.drools.system.PolicyEngine;
90 import org.onap.policy.drools.system.PolicyEngineConstants;
91 import org.springframework.test.util.ReflectionTestUtils;
93 @ExtendWith(MockitoExtension.class)
94 class DistributedLockManagerTest {
95 private static final long EXPIRE_SEC = 900L;
96 private static final long RETRY_SEC = 60L;
97 private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
98 private static final String OTHER_HOST = "other-host";
99 private static final String OTHER_OWNER = "other-owner";
100 private static final String EXPECTED_EXCEPTION = "expected exception";
101 private static final String DB_CONNECTION =
102 "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
103 private static final String DB_USER = "user";
104 private static final String DB_PASSWORD = "password";
105 private static final String OWNER_KEY = "my key";
106 private static final String RESOURCE = "my resource";
107 private static final String RESOURCE2 = "my resource #2";
108 private static final String RESOURCE3 = "my resource #3";
109 private static final String RESOURCE4 = "my resource #4";
110 private static final String RESOURCE5 = "my resource #5";
111 private static final int HOLD_SEC = 100;
112 private static final int HOLD_SEC2 = 120;
113 private static final int MAX_THREADS = 5;
114 private static final int MAX_LOOPS = 100;
115 private static final boolean TRANSIENT = true;
116 private static final boolean PERMANENT = false;
118 // number of execute() calls before the first lock attempt
119 private static final int PRE_LOCK_EXECS = 1;
121 // number of execute() calls before the first schedule attempt
122 private static final int PRE_SCHED_EXECS = 1;
124 private static Connection conn = null;
125 private static ScheduledExecutorService saveExec;
126 private static ScheduledExecutorService realExec;
129 private PolicyEngine engine;
132 private KieSession kieSess;
135 private ScheduledExecutorService exsvc;
138 private ScheduledFuture<?> checker;
141 private LockCallback callback;
144 private BasicDataSource datasrc;
146 private DistributedLock lock;
148 private AtomicInteger nactive;
149 private AtomicInteger nsuccesses;
150 private DistributedLockManager feature;
152 AutoCloseable closeable;
155 * Configures the location of the property files and creates the DB.
157 * @throws SQLException if the DB cannot be created
160 static void setUpBeforeClass() throws SQLException {
161 SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
162 PolicyEngineConstants.getManager().configure(new Properties());
164 conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
166 try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
167 + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
168 + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
169 createStmt.executeUpdate();
172 saveExec = (ScheduledExecutorService) ReflectionTestUtils.getField(PolicyEngineConstants.getManager(),
173 POLICY_ENGINE_EXECUTOR_FIELD);
175 realExec = Executors.newScheduledThreadPool(3);
179 * Restores static fields.
182 static void tearDownAfterClass() throws SQLException {
183 ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
189 * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
192 * @throws SQLException if the lock records cannot be deleted from the DB
195 void setUp() throws SQLException {
196 closeable = MockitoAnnotations.openMocks(this);
197 // grant() and deny() calls will come through here and be immediately executed
198 PolicySession session = new PolicySession(null, null, kieSess) {
200 public void insertDrools(Object object) {
201 ((Runnable) object).run();
205 session.setPolicySession();
207 nactive = new AtomicInteger(0);
208 nsuccesses = new AtomicInteger(0);
212 feature = new MyLockingFeature(true);
216 void tearDown() throws Exception {
222 private void cleanDb() throws SQLException {
223 try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
224 stmt.executeUpdate();
228 private void shutdownFeature() {
229 if (feature != null) {
230 feature.afterStop(engine);
236 * Tests that the feature is found in the expected service sets.
239 void testServiceApis() {
240 assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
241 .anyMatch(obj -> obj instanceof DistributedLockManager));
245 void testGetSequenceNumber() {
246 assertEquals(1000, feature.getSequenceNumber());
250 void testBeforeCreateLockManager() {
251 assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
255 * Tests beforeCreate(), when getProperties() throws a runtime exception.
258 void testBeforeCreateLockManagerEx() {
261 feature = new MyLockingFeature(false) {
263 protected Properties getProperties(String fileName) {
264 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
268 Properties props = new Properties();
269 assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, props))
270 .isInstanceOf(DistributedLockManagerException.class);
274 void testAfterStart() {
275 // verify that cleanup & expire check are both added to the queue
276 verify(exsvc).execute(any());
277 verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
281 * Tests afterStart(), when thread pool throws a runtime exception.
284 void testAfterStartExInThreadPool() {
287 feature = new MyLockingFeature(false);
289 doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
291 assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
295 void testDeleteExpiredDbLocks() throws SQLException {
296 // add records: two expired, one not
297 insertRecord(RESOURCE, feature.getUuidString(), -1);
298 insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
299 insertRecord(RESOURCE3, OTHER_OWNER, 0);
300 insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
302 // get the clean-up function and execute it
303 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
304 verify(exsvc).execute(captor.capture());
306 long tbegin = System.currentTimeMillis();
307 Runnable action = captor.getValue();
310 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
311 assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
312 assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
313 assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
315 assertEquals(2, getRecordCount());
319 * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
323 void testDeleteExpiredDbLocksEx() {
324 feature = new InvalidDbLockingFeature(TRANSIENT);
326 // get the clean-up function and execute it
327 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
328 verify(exsvc).execute(captor.capture());
330 Runnable action = captor.getValue();
332 // should not throw an exception
337 void testAfterStop() {
339 verify(checker).cancel(anyBoolean());
341 feature = new DistributedLockManager();
343 // shutdown without calling afterStart()
349 * Tests afterStop(), when the data source throws an exception when close() is called.
353 void testAfterStopEx() {
356 // use a data source that throws an exception when closed
357 feature = new InvalidDbLockingFeature(TRANSIENT);
359 assertThatCode(this::shutdownFeature).doesNotThrowAnyException();
363 void testCreateLock() throws SQLException {
364 verify(exsvc).execute(any());
366 lock = getLock(RESOURCE, callback);
367 assertTrue(lock.isWaiting());
369 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
371 // this lock should fail
372 LockCallback callback2 = mock(LockCallback.class);
373 DistributedLock lock2 = getLock(RESOURCE, callback2);
374 assertTrue(lock2.isUnavailable());
375 verify(callback2, never()).lockAvailable(lock2);
376 verify(callback2).lockUnavailable(lock2);
378 // this should fail, too
379 LockCallback callback3 = mock(LockCallback.class);
380 DistributedLock lock3 = getLock(RESOURCE, callback3);
381 assertTrue(lock3.isUnavailable());
382 verify(callback3, never()).lockAvailable(lock3);
383 verify(callback3).lockUnavailable(lock3);
385 // no change to first
386 assertTrue(lock.isWaiting());
388 // no callbacks to the first lock
389 verify(callback, never()).lockAvailable(lock);
390 verify(callback, never()).lockUnavailable(lock);
392 assertTrue(lock.isWaiting());
393 assertEquals(0, getRecordCount());
396 assertTrue(lock.isActive());
397 assertEquals(1, getRecordCount());
399 verify(callback).lockAvailable(lock);
400 verify(callback, never()).lockUnavailable(lock);
402 // this should succeed
403 DistributedLock lock4 = getLock(RESOURCE2, callback);
404 assertTrue(lock4.isWaiting());
406 // after running checker, original records should still remain
407 runChecker(0, EXPIRE_SEC);
408 assertEquals(1, getRecordCount());
409 verify(callback, never()).lockUnavailable(lock);
413 * Tests createLock() when the feature is not the latest instance.
416 void testCreateLockNotLatestInstance() {
417 DistributedLockManager.setLatestInstance(null);
419 Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
420 assertTrue(lock.isUnavailable());
421 verify(callback, never()).lockAvailable(any());
422 verify(callback).lockUnavailable(lock);
426 void testCheckExpired() throws SQLException {
427 lock = getLock(RESOURCE, callback);
430 LockCallback callback2 = mock(LockCallback.class);
431 final DistributedLock lock2 = getLock(RESOURCE2, callback2);
434 LockCallback callback3 = mock(LockCallback.class);
435 final DistributedLock lock3 = getLock(RESOURCE3, callback3);
438 LockCallback callback4 = mock(LockCallback.class);
439 final DistributedLock lock4 = getLock(RESOURCE4, callback4);
442 LockCallback callback5 = mock(LockCallback.class);
443 final DistributedLock lock5 = getLock(RESOURCE5, callback5);
446 assertEquals(5, getRecordCount());
449 updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
451 // change host of another record
452 updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
454 // change uuid of another record
455 updateRecord(RESOURCE5, feature.getPdpName(), OTHER_OWNER, HOLD_SEC);
458 runChecker(0, EXPIRE_SEC);
461 assertTrue(lock.isUnavailable());
462 assertTrue(lock2.isActive());
463 assertTrue(lock3.isUnavailable());
464 assertTrue(lock4.isActive());
465 assertTrue(lock5.isUnavailable());
471 verify(callback).lockUnavailable(lock);
472 verify(callback3).lockUnavailable(lock3);
473 verify(callback5).lockUnavailable(lock5);
475 verify(callback2, never()).lockUnavailable(lock2);
476 verify(callback4, never()).lockUnavailable(lock4);
478 // another check should have been scheduled, with the normal interval
479 runChecker(1, EXPIRE_SEC);
483 * Tests checkExpired(), when schedule() throws an exception.
486 void testCheckExpiredExecRejected() {
487 // arrange for execution to be rejected
488 when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
489 .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
491 runChecker(0, EXPIRE_SEC);
495 * Tests checkExpired(), when getConnection() throws an exception.
498 void testCheckExpiredSqlEx() {
499 // use a data source that throws an exception when getConnection() is called
500 feature = new InvalidDbLockingFeature(TRANSIENT);
502 runChecker(0, EXPIRE_SEC);
504 // it should have scheduled another check, sooner
505 runChecker(0, RETRY_SEC);
509 * Tests checkExpired(), when getConnection() throws an exception and the feature is
513 void testCheckExpiredSqlExFeatureStopped() {
514 // use a data source that throws an exception when getConnection() is called
515 feature = new InvalidDbLockingFeature(TRANSIENT) {
517 protected SQLException makeEx() {
519 return super.makeEx();
523 runChecker(0, EXPIRE_SEC);
525 // it should NOT have scheduled another check
526 verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
530 void testExpireLocks() throws SQLException {
531 AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
533 feature = new MyLockingFeature(true) {
535 protected BasicDataSource makeDataSource() throws Exception {
536 // get the real data source
537 BasicDataSource src2 = super.makeDataSource();
539 when(datasrc.getConnection()).thenAnswer(answer -> {
540 DistributedLock lck = freeLock.getAndSet(null);
549 return src2.getConnection();
556 lock = getLock(RESOURCE, callback);
559 LockCallback callback2 = mock(LockCallback.class);
560 final DistributedLock lock2 = getLock(RESOURCE2, callback2);
563 LockCallback callback3 = mock(LockCallback.class);
564 final DistributedLock lock3 = getLock(RESOURCE3, callback3);
565 // don't run doLock for lock3 - leave it in the waiting state
567 LockCallback callback4 = mock(LockCallback.class);
568 final DistributedLock lock4 = getLock(RESOURCE4, callback4);
571 assertEquals(3, getRecordCount());
574 updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
576 // arrange to free lock4 while the checker is running
580 runChecker(0, EXPIRE_SEC);
583 assertTrue(lock.isUnavailable());
584 assertTrue(lock2.isActive());
585 assertTrue(lock3.isWaiting());
586 assertTrue(lock4.isUnavailable());
589 verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
591 verify(callback).lockUnavailable(lock);
592 verify(callback2, never()).lockUnavailable(lock2);
593 verify(callback3, never()).lockUnavailable(lock3);
594 verify(callback4, never()).lockUnavailable(lock4);
598 void testDistributedLockNoArgs() {
599 DistributedLock lock = new DistributedLock();
600 assertNull(lock.getResourceId());
601 assertNull(lock.getOwnerKey());
602 assertNull(lock.getCallback());
603 assertEquals(0, lock.getHoldSec());
607 void testDistributedLock() {
608 assertThatIllegalArgumentException()
609 .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
610 .withMessageContaining("holdSec");
612 // should generate no exception
613 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
617 void testDistributedLockSerializable() throws Exception {
618 DistributedLock lock = getLock(RESOURCE, callback);
619 lock = roundTrip(lock);
621 assertTrue(lock.isWaiting());
623 assertEquals(RESOURCE, lock.getResourceId());
624 assertEquals(OWNER_KEY, lock.getOwnerKey());
625 assertNull(lock.getCallback());
626 assertEquals(HOLD_SEC, lock.getHoldSec());
631 lock = getLock(RESOURCE, callback);
632 assertFalse(lock.isActive());
634 // execute the doLock() call
637 assertTrue(lock.isActive());
639 // the callback for the lock should have been run in the foreground thread
640 verify(callback).lockAvailable(lock);
644 void testDistributedLockDeny() {
646 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
648 // get another lock - should fail
649 lock = getLock(RESOURCE, callback);
651 assertTrue(lock.isUnavailable());
653 // the callback for the second lock should have been run in the foreground thread
654 verify(callback).lockUnavailable(lock);
656 // should only have a request for the first lock
657 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
661 void testDistributedLockFree() {
662 lock = getLock(RESOURCE, callback);
664 assertTrue(lock.free());
665 assertTrue(lock.isUnavailable());
667 // run both requests associated with the lock
671 // should not have changed state
672 assertTrue(lock.isUnavailable());
674 // attempt to free it again
675 assertFalse(lock.free());
677 // should not have queued anything else
678 verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
680 // new lock should succeed
681 DistributedLock lock2 = getLock(RESOURCE, callback);
682 assertNotSame(lock2, lock);
683 assertTrue(lock2.isWaiting());
687 * Tests that free() works on a serialized lock with a new feature.
689 * @throws Exception if an error occurs
692 void testDistributedLockFreeSerialized() throws Exception {
693 DistributedLock lock = getLock(RESOURCE, callback);
695 feature = new MyLockingFeature(true);
697 lock = roundTrip(lock);
698 assertTrue(lock.free());
699 assertTrue(lock.isUnavailable());
703 * Tests free() on a serialized lock without a feature.
705 * @throws Exception if an error occurs
708 void testDistributedLockFreeNoFeature() throws Exception {
709 DistributedLock lock = getLock(RESOURCE, callback);
711 DistributedLockManager.setLatestInstance(null);
713 lock = roundTrip(lock);
714 assertFalse(lock.free());
715 assertTrue(lock.isUnavailable());
719 * Tests the case where the lock is freed and doUnlock called between the call to
720 * isUnavailable() and the call to compute().
723 void testDistributedLockFreeUnlocked() {
724 feature = new FreeWithFreeLockingFeature(true);
726 lock = getLock(RESOURCE, callback);
728 assertFalse(lock.free());
729 assertTrue(lock.isUnavailable());
733 * Tests the case where the lock is freed, but doUnlock is not completed, between the
734 * call to isUnavailable() and the call to compute().
737 void testDistributedLockFreeLockFreed() {
738 feature = new FreeWithFreeLockingFeature(false);
740 lock = getLock(RESOURCE, callback);
742 assertFalse(lock.free());
743 assertTrue(lock.isUnavailable());
747 void testDistributedLockExtend() {
748 lock = getLock(RESOURCE, callback);
750 // lock2 should be denied - called back by this thread
751 DistributedLock lock2 = getLock(RESOURCE, callback);
752 verify(callback, never()).lockAvailable(lock2);
753 verify(callback).lockUnavailable(lock2);
755 // lock2 will still be denied - called back by this thread
756 lock2.extend(HOLD_SEC, callback);
757 verify(callback, times(2)).lockUnavailable(lock2);
759 // force lock2 to be active - should still be denied
760 ReflectionTestUtils.setField(lock2, "state", LockState.ACTIVE);
761 lock2.extend(HOLD_SEC, callback);
762 verify(callback, times(3)).lockUnavailable(lock2);
764 assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
765 .withMessageContaining("holdSec");
769 assertTrue(lock.isActive());
771 // now extend the first lock
772 LockCallback callback2 = mock(LockCallback.class);
773 lock.extend(HOLD_SEC2, callback2);
774 assertTrue(lock.isWaiting());
776 // execute doExtend()
778 lock.extend(HOLD_SEC2, callback2);
779 assertEquals(HOLD_SEC2, lock.getHoldSec());
780 verify(callback2).lockAvailable(lock);
781 verify(callback2, never()).lockUnavailable(lock);
785 * Tests that extend() works on a serialized lock with a new feature.
787 * @throws Exception if an error occurs
790 void testDistributedLockExtendSerialized() throws Exception {
791 DistributedLock lock = getLock(RESOURCE, callback);
795 assertTrue(lock.isActive());
797 feature = new MyLockingFeature(true);
799 lock = roundTrip(lock);
800 assertTrue(lock.isActive());
802 LockCallback scallback = mock(LockCallback.class);
804 lock.extend(HOLD_SEC, scallback);
805 assertTrue(lock.isWaiting());
807 // run doExtend (in new feature)
809 assertTrue(lock.isActive());
811 verify(scallback).lockAvailable(lock);
812 verify(scallback, never()).lockUnavailable(lock);
816 * Tests extend() on a serialized lock without a feature.
818 * @throws Exception if an error occurs
821 void testDistributedLockExtendNoFeature() throws Exception {
822 DistributedLock lock = getLock(RESOURCE, callback);
826 assertTrue(lock.isActive());
828 DistributedLockManager.setLatestInstance(null);
830 lock = roundTrip(lock);
831 assertTrue(lock.isActive());
833 LockCallback scallback = mock(LockCallback.class);
835 lock.extend(HOLD_SEC, scallback);
836 assertTrue(lock.isUnavailable());
838 verify(scallback, never()).lockAvailable(lock);
839 verify(scallback).lockUnavailable(lock);
843 * Tests the case where the lock is freed and doUnlock called between the call to
844 * isUnavailable() and the call to compute().
847 void testDistributedLockExtendUnlocked() {
848 feature = new FreeWithFreeLockingFeature(true);
850 lock = getLock(RESOURCE, callback);
852 lock.extend(HOLD_SEC2, callback);
853 assertTrue(lock.isUnavailable());
854 verify(callback).lockUnavailable(lock);
858 * Tests the case where the lock is freed, but doUnlock is not completed, between the
859 * call to isUnavailable() and the call to compute().
862 void testDistributedLockExtendLockFreed() {
863 feature = new FreeWithFreeLockingFeature(false);
865 lock = getLock(RESOURCE, callback);
867 lock.extend(HOLD_SEC2, callback);
868 assertTrue(lock.isUnavailable());
869 verify(callback).lockUnavailable(lock);
873 void testDistributedLockScheduleRequest() {
874 lock = getLock(RESOURCE, callback);
877 verify(callback).lockAvailable(lock);
881 void testDistributedLockRescheduleRequest() {
882 // use a data source that throws an exception when getConnection() is called
883 InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
886 lock = getLock(RESOURCE, callback);
888 // invoke doLock - should fail and reschedule
891 // should still be waiting
892 assertTrue(lock.isWaiting());
893 verify(callback, never()).lockUnavailable(lock);
895 // free the lock while doLock is executing
896 invfeat.freeLock = true;
898 // try scheduled request - should just invoke doUnlock
901 // should still be waiting
902 assertTrue(lock.isUnavailable());
903 verify(callback, never()).lockUnavailable(lock);
905 // should have scheduled a retry of doUnlock
906 verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
910 void testDistributedLockGetNextRequest() {
911 lock = getLock(RESOURCE, callback);
914 * run doLock. This should cause getNextRequest() to be called twice, once with a
915 * request in the queue, and the second time with request=null.
921 * Tests getNextRequest(), where the same request is still in the queue the second
925 void testDistributedLockGetNextRequestSameRequest() {
926 // force reschedule to be invoked
927 feature = new InvalidDbLockingFeature(TRANSIENT);
929 lock = getLock(RESOURCE, callback);
932 * run doLock. This should cause getNextRequest() to be called twice, once with a
933 * request in the queue, and the second time with the same request again.
937 verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
941 void testDistributedLockDoRequest() {
942 lock = getLock(RESOURCE, callback);
944 assertTrue(lock.isWaiting());
946 // run doLock via doRequest
949 assertTrue(lock.isActive());
953 * Tests doRequest(), when doRequest() is already running within another thread.
956 void testDistributedLockDoRequestBusy() {
958 * this feature will invoke a request in a background thread while it's being run
959 * in a foreground thread.
961 AtomicBoolean running = new AtomicBoolean(false);
962 AtomicBoolean returned = new AtomicBoolean(false);
964 feature = new MyLockingFeature(true) {
966 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
967 LockCallback callback) {
968 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
969 private static final long serialVersionUID = 1L;
972 protected boolean doDbInsert(Connection conn) throws SQLException {
974 // already inside the thread - don't recurse any further
975 return super.doDbInsert(conn);
980 Thread thread = new Thread(() -> {
981 // run doLock from within the new thread
984 thread.setDaemon(true);
987 // wait for the background thread to complete before continuing
990 } catch (InterruptedException ignore) {
991 Thread.currentThread().interrupt();
994 returned.set(!thread.isAlive());
996 return super.doDbInsert(conn);
1002 lock = getLock(RESOURCE, callback);
1007 assertTrue(returned.get());
1011 * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1013 * @throws SQLException if an error occurs
1016 void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1017 // throw run-time exception
1018 when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1020 // use a data source that throws an exception when getConnection() is called
1021 feature = new MyLockingFeature(true) {
1023 protected BasicDataSource makeDataSource() {
1028 lock = getLock(RESOURCE, callback);
1030 // invoke doLock - should NOT reschedule
1033 assertTrue(lock.isUnavailable());
1034 verify(callback).lockUnavailable(lock);
1036 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1040 * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1043 * @throws SQLException if an error occurs
1046 void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1047 // throw run-time exception
1048 when(datasrc.getConnection()).thenAnswer(answer -> {
1050 throw new IllegalStateException(EXPECTED_EXCEPTION);
1053 // use a data source that throws an exception when getConnection() is called
1054 feature = new MyLockingFeature(true) {
1056 protected BasicDataSource makeDataSource() {
1061 lock = getLock(RESOURCE, callback);
1063 // invoke doLock - should NOT reschedule
1066 assertTrue(lock.isUnavailable());
1067 verify(callback, never()).lockUnavailable(lock);
1069 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1073 * Tests doRequest() when the retry count gets exhausted.
1076 void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1077 // use a data source that throws an exception when getConnection() is called
1078 feature = new InvalidDbLockingFeature(TRANSIENT);
1080 lock = getLock(RESOURCE, callback);
1082 // invoke doLock - should fail and reschedule
1085 // should still be waiting
1086 assertTrue(lock.isWaiting());
1087 verify(callback, never()).lockUnavailable(lock);
1089 // try again, via SCHEDULER - first retry fails
1092 // should still be waiting
1093 assertTrue(lock.isWaiting());
1094 verify(callback, never()).lockUnavailable(lock);
1096 // try again, via SCHEDULER - final retry fails
1098 assertTrue(lock.isUnavailable());
1100 // now callback should have been called
1101 verify(callback).lockUnavailable(lock);
1105 * Tests doRequest() when a non-transient DB exception is thrown.
1108 void testDistributedLockDoRequestNotTransient() {
1110 * use a data source that throws a PERMANENT exception when getConnection() is
1113 feature = new InvalidDbLockingFeature(PERMANENT);
1115 lock = getLock(RESOURCE, callback);
1117 // invoke doLock - should fail
1120 assertTrue(lock.isUnavailable());
1121 verify(callback).lockUnavailable(lock);
1123 // should not have scheduled anything new
1124 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1125 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1129 void testDistributedLockDoLock() throws SQLException {
1130 lock = getLock(RESOURCE, callback);
1132 // invoke doLock - should simply do an insert
1133 long tbegin = System.currentTimeMillis();
1136 assertEquals(1, getRecordCount());
1137 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1138 verify(callback).lockAvailable(lock);
1142 * Tests doLock() when the lock is freed before doLock runs.
1144 * @throws SQLException if an error occurs
1147 void testDistributedLockDoLockFreed() throws SQLException {
1148 lock = getLock(RESOURCE, callback);
1150 lock.setState(LockState.UNAVAILABLE);
1152 // invoke doLock - should do nothing
1155 assertEquals(0, getRecordCount());
1157 verify(callback, never()).lockAvailable(lock);
1161 * Tests doLock() when a DB exception is thrown.
1164 void testDistributedLockDoLockEx() {
1165 // use a data source that throws an exception when getConnection() is called
1166 feature = new InvalidDbLockingFeature(PERMANENT);
1168 lock = getLock(RESOURCE, callback);
1170 // invoke doLock - should simply do an insert
1173 // lock should have failed due to exception
1174 verify(callback).lockUnavailable(lock);
1178 * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1182 void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1183 // insert an expired record
1184 insertRecord(RESOURCE, feature.getUuidString(), 0);
1186 lock = getLock(RESOURCE, callback);
1188 // invoke doLock - should simply do an update
1190 verify(callback).lockAvailable(lock);
1194 * Tests doLock() when a locked record already exists.
1197 void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1198 // insert an expired record
1199 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1201 lock = getLock(RESOURCE, callback);
1206 // lock should have failed because it's already locked
1207 verify(callback).lockUnavailable(lock);
1211 void testDistributedLockDoUnlock() throws SQLException {
1212 lock = getLock(RESOURCE, callback);
1219 // invoke doUnlock()
1220 long tbegin = System.currentTimeMillis();
1223 assertEquals(0, getRecordCount());
1224 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1226 assertTrue(lock.isUnavailable());
1228 // no more callbacks should have occurred
1229 verify(callback, times(1)).lockAvailable(lock);
1230 verify(callback, never()).lockUnavailable(lock);
1234 * Tests doUnlock() when a DB exception is thrown.
1238 void testDistributedLockDoUnlockEx() {
1239 feature = new InvalidDbLockingFeature(PERMANENT);
1241 lock = getLock(RESOURCE, callback);
1243 // do NOT invoke doLock() - it will fail without a DB connection
1247 // invoke doUnlock()
1250 assertTrue(lock.isUnavailable());
1252 // no more callbacks should have occurred
1253 verify(callback, never()).lockAvailable(lock);
1254 verify(callback, never()).lockUnavailable(lock);
1258 void testDistributedLockDoExtend() throws SQLException {
1259 lock = getLock(RESOURCE, callback);
1262 LockCallback callback2 = mock(LockCallback.class);
1263 lock.extend(HOLD_SEC2, callback2);
1266 long tbegin = System.currentTimeMillis();
1269 assertEquals(1, getRecordCount());
1270 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1272 assertTrue(lock.isActive());
1274 // no more callbacks should have occurred
1275 verify(callback).lockAvailable(lock);
1276 verify(callback, never()).lockUnavailable(lock);
1278 // extension should have succeeded
1279 verify(callback2).lockAvailable(lock);
1280 verify(callback2, never()).lockUnavailable(lock);
1284 * Tests doExtend() when the lock is freed before doExtend runs.
1286 * @throws SQLException if an error occurs
1289 void testDistributedLockDoExtendFreed() throws SQLException {
1290 lock = getLock(RESOURCE, callback);
1291 lock.extend(HOLD_SEC2, callback);
1293 lock.setState(LockState.UNAVAILABLE);
1295 // invoke doExtend - should do nothing
1298 assertEquals(0, getRecordCount());
1300 verify(callback, never()).lockAvailable(lock);
1304 * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1307 * @throws SQLException if an error occurs
1310 void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1311 lock = getLock(RESOURCE, callback);
1314 LockCallback callback2 = mock(LockCallback.class);
1315 lock.extend(HOLD_SEC2, callback2);
1317 // delete the record so it's forced to re-insert it
1321 long tbegin = System.currentTimeMillis();
1324 assertEquals(1, getRecordCount());
1325 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1327 assertTrue(lock.isActive());
1329 // no more callbacks should have occurred
1330 verify(callback).lockAvailable(lock);
1331 verify(callback, never()).lockUnavailable(lock);
1333 // extension should have succeeded
1334 verify(callback2).lockAvailable(lock);
1335 verify(callback2, never()).lockUnavailable(lock);
1339 * Tests doExtend() when both update and insert fail.
1343 void testDistributedLockDoExtendNeitherSucceeds() {
1345 * this feature will create a lock that returns false when doDbUpdate() is
1346 * invoked, or when doDbInsert() is invoked a second time
1348 feature = new MyLockingFeature(true) {
1350 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1351 LockCallback callback) {
1352 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1353 private static final long serialVersionUID = 1L;
1354 private int ntimes = 0;
1357 protected boolean doDbInsert(Connection conn) throws SQLException {
1362 return super.doDbInsert(conn);
1366 protected boolean doDbUpdate(Connection conn) {
1373 lock = getLock(RESOURCE, callback);
1376 LockCallback callback2 = mock(LockCallback.class);
1377 lock.extend(HOLD_SEC2, callback2);
1382 assertTrue(lock.isUnavailable());
1384 // no more callbacks should have occurred
1385 verify(callback).lockAvailable(lock);
1386 verify(callback, never()).lockUnavailable(lock);
1388 // extension should have failed
1389 verify(callback2, never()).lockAvailable(lock);
1390 verify(callback2).lockUnavailable(lock);
1394 * Tests doExtend() when an exception occurs.
1396 * @throws SQLException if an error occurs
1399 void testDistributedLockDoExtendEx() throws SQLException {
1400 lock = getLock(RESOURCE, callback);
1404 * delete the record and insert one with a different owner, which will cause
1405 * doDbInsert() to throw an exception
1408 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1410 LockCallback callback2 = mock(LockCallback.class);
1411 lock.extend(HOLD_SEC2, callback2);
1416 assertTrue(lock.isUnavailable());
1418 // no more callbacks should have occurred
1419 verify(callback).lockAvailable(lock);
1420 verify(callback, never()).lockUnavailable(lock);
1422 // extension should have failed
1423 verify(callback2, never()).lockAvailable(lock);
1424 verify(callback2).lockUnavailable(lock);
1428 void testDistributedLockToString() {
1429 String text = getLock(RESOURCE, callback).toString();
1430 assertNotNull(text);
1431 assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1435 void testMakeThreadPool() {
1436 // use a REAL feature to test this
1437 feature = new DistributedLockManager();
1439 // this should create a thread pool
1440 feature.beforeCreateLockManager(engine, new Properties());
1441 feature.afterStart(engine);
1443 assertThatCode(this::shutdownFeature).doesNotThrowAnyException();
1447 * Performs a multithreaded test of the locking facility.
1449 * @throws InterruptedException if the current thread is interrupted while waiting for
1450 * the background threads to complete
1453 void testMultiThreaded() throws InterruptedException {
1454 ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1456 feature = new DistributedLockManager();
1457 feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1458 feature.afterStart(PolicyEngineConstants.getManager());
1460 List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1461 for (int x = 0; x < MAX_THREADS; ++x) {
1462 threads.add(new MyThread());
1465 threads.forEach(Thread::start);
1467 for (MyThread thread : threads) {
1469 assertFalse(thread.isAlive());
1472 for (MyThread thread : threads) {
1473 if (thread.err != null) {
1478 assertTrue(nsuccesses.get() > 0);
1481 private DistributedLock getLock(String resource, LockCallback callback) {
1482 return (DistributedLock) feature.createLock(resource, DistributedLockManagerTest.OWNER_KEY,
1483 DistributedLockManagerTest.HOLD_SEC, callback, false);
1486 private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1487 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1488 try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1489 oos.writeObject(lock);
1492 ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1493 try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1494 return (DistributedLock) ois.readObject();
1499 * Runs the checkExpired() action.
1501 * @param nskip number of actions in the work queue to skip
1502 * @param schedSec number of seconds for which the checker should have been scheduled
1504 private void runChecker(int nskip, long schedSec) {
1505 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1506 verify(exsvc, times(nskip + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1507 Runnable action = captor.getAllValues().get(nskip);
1512 * Runs a lock action (e.g., doLock, doUnlock).
1514 * @param nskip number of actions in the work queue to skip
1515 * @param nadditional number of additional actions that appear in the work queue
1516 * <i>after</i> the desired action
1518 void runLock(int nskip, int nadditional) {
1519 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1520 verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1522 Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1527 * Runs a scheduled action (e.g., "retry" action).
1529 * @param nskip number of actions in the work queue to skip
1531 void runSchedule(int nskip) {
1532 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1533 verify(exsvc, times(PRE_SCHED_EXECS + nskip + 1)).schedule(captor.capture(), anyLong(), any());
1535 Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1540 * Gets a count of the number of lock records in the DB.
1542 * @return the number of lock records in the DB
1543 * @throws SQLException if an error occurs accessing the DB
1545 private int getRecordCount() throws SQLException {
1546 try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1547 ResultSet result = stmt.executeQuery()) {
1549 if (result.next()) {
1550 return result.getInt(1);
1559 * Determines if there is a record for the given resource whose expiration time is in
1560 * the expected range.
1562 * @param resourceId ID of the resource of interest
1563 * @param uuidString UUID string of the owner
1564 * @param holdSec seconds for which the lock was to be held
1565 * @param tbegin earliest time, in milliseconds, at which the record could have been
1566 * inserted into the DB
1567 * @return {@code true} if a record is found, {@code false} otherwise
1568 * @throws SQLException if an error occurs accessing the DB
1570 private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1571 try (PreparedStatement stmt =
1572 conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1573 + " WHERE resourceId=? AND host=? AND owner=?")) {
1575 stmt.setString(1, resourceId);
1576 stmt.setString(2, feature.getPdpName());
1577 stmt.setString(3, uuidString);
1579 try (ResultSet result = stmt.executeQuery()) {
1580 if (result.next()) {
1581 int remaining = result.getInt(1);
1582 long maxDiff = System.currentTimeMillis() - tbegin;
1583 return (remaining >= 0 && holdSec - remaining <= maxDiff);
1593 * Inserts a record into the DB.
1595 * @param resourceId ID of the resource of interest
1596 * @param uuidString UUID string of the owner
1597 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1598 * @throws SQLException if an error occurs accessing the DB
1600 private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1601 this.insertRecord(resourceId, feature.getPdpName(), uuidString, expireOffset);
1604 private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1605 throws SQLException {
1606 try (PreparedStatement stmt =
1607 conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1608 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1610 stmt.setString(1, resourceId);
1611 stmt.setString(2, hostName);
1612 stmt.setString(3, uuidString);
1613 stmt.setInt(4, expireOffset);
1615 assertEquals(1, stmt.executeUpdate());
1620 * Updates a record in the DB.
1622 * @param resourceId ID of the resource of interest
1623 * @param newUuid UUID string of the <i>new</i> owner
1624 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1625 * @throws SQLException if an error occurs accessing the DB
1627 private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1628 try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1629 + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1631 stmt.setString(1, newHost);
1632 stmt.setString(2, newUuid);
1633 stmt.setInt(3, expireOffset);
1634 stmt.setString(4, resourceId);
1636 assertEquals(1, stmt.executeUpdate());
1641 * Feature that uses <i>exsvc</i> to execute requests.
1643 private class MyLockingFeature extends DistributedLockManager {
1645 public MyLockingFeature(boolean init) {
1648 exsvc = mock(ScheduledExecutorService.class);
1649 lenient().when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1650 ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1653 beforeCreateLockManager(engine, new Properties());
1661 * Feature whose data source all throws exceptions.
1663 private class InvalidDbLockingFeature extends MyLockingFeature {
1664 private final boolean isTransient;
1665 private boolean freeLock = false;
1667 InvalidDbLockingFeature(boolean isTransient) {
1668 // pass "false" because we have to set the error code BEFORE calling
1672 this.isTransient = isTransient;
1674 this.beforeCreateLockManager(engine, new Properties());
1676 this.afterStart(engine);
1680 protected BasicDataSource makeDataSource() throws Exception {
1681 lenient().when(datasrc.getConnection()).thenAnswer(answer -> {
1690 doThrow(makeEx()).when(datasrc).close();
1695 protected SQLException makeEx() {
1697 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1700 return new SQLException(EXPECTED_EXCEPTION);
1706 * Feature whose locks free themselves while free() is already running.
1708 private class FreeWithFreeLockingFeature extends MyLockingFeature {
1709 private final boolean relock;
1711 public FreeWithFreeLockingFeature(boolean relock) {
1713 this.relock = relock;
1717 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1718 LockCallback callback) {
1720 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1721 private static final long serialVersionUID = 1L;
1722 private boolean checked = false;
1725 public boolean isUnavailable() {
1727 return super.isUnavailable();
1732 // release and relock
1740 createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1750 * Thread used with the multithreaded test. It repeatedly attempts to get a lock,
1751 * extend it, and then unlock it.
1753 private class MyThread extends Thread {
1754 AssertionError err = null;
1763 for (int x = 0; x < MAX_LOOPS; ++x) {
1767 } catch (AssertionError e) {
1772 private void makeAttempt() {
1774 Semaphore sem = new Semaphore(0);
1776 LockCallback cb = new LockCallback() {
1778 public void lockAvailable(Lock lock) {
1783 public void lockUnavailable(Lock lock) {
1788 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1790 // wait for callback, whether available or unavailable
1791 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1792 if (!lock.isActive()) {
1796 nsuccesses.incrementAndGet();
1798 assertEquals(1, nactive.incrementAndGet());
1800 lock.extend(HOLD_SEC2, cb);
1801 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1802 assertTrue(lock.isActive());
1804 // decrement BEFORE free()
1805 nactive.decrementAndGet();
1807 assertTrue(lock.free());
1808 assertTrue(lock.isUnavailable());
1810 } catch (InterruptedException e) {
1811 Thread.currentThread().interrupt();
1812 throw new AssertionError("interrupted", e);