2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.distributed.locking;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
25 import static org.assertj.core.api.Assertions.assertThatThrownBy;
26 import static org.junit.Assert.assertEquals;
27 import static org.junit.Assert.assertFalse;
28 import static org.junit.Assert.assertNotNull;
29 import static org.junit.Assert.assertNull;
30 import static org.junit.Assert.assertSame;
31 import static org.junit.Assert.assertTrue;
32 import static org.mockito.Matchers.any;
33 import static org.mockito.Matchers.anyBoolean;
34 import static org.mockito.Matchers.anyLong;
35 import static org.mockito.Matchers.eq;
36 import static org.mockito.Mockito.doThrow;
37 import static org.mockito.Mockito.mock;
38 import static org.mockito.Mockito.never;
39 import static org.mockito.Mockito.times;
40 import static org.mockito.Mockito.verify;
41 import static org.mockito.Mockito.when;
43 import java.io.ByteArrayInputStream;
44 import java.io.ByteArrayOutputStream;
45 import java.io.ObjectInputStream;
46 import java.io.ObjectOutputStream;
47 import java.sql.Connection;
48 import java.sql.DriverManager;
49 import java.sql.PreparedStatement;
50 import java.sql.ResultSet;
51 import java.sql.SQLException;
52 import java.sql.SQLTransientException;
53 import java.util.ArrayList;
54 import java.util.List;
55 import java.util.Properties;
56 import java.util.concurrent.Executors;
57 import java.util.concurrent.RejectedExecutionException;
58 import java.util.concurrent.ScheduledExecutorService;
59 import java.util.concurrent.ScheduledFuture;
60 import java.util.concurrent.Semaphore;
61 import java.util.concurrent.TimeUnit;
62 import java.util.concurrent.atomic.AtomicBoolean;
63 import java.util.concurrent.atomic.AtomicInteger;
64 import java.util.concurrent.atomic.AtomicReference;
65 import org.apache.commons.dbcp2.BasicDataSource;
66 import org.junit.After;
67 import org.junit.AfterClass;
68 import org.junit.Before;
69 import org.junit.BeforeClass;
70 import org.junit.Test;
71 import org.mockito.ArgumentCaptor;
72 import org.mockito.Mock;
73 import org.mockito.MockitoAnnotations;
74 import org.onap.policy.common.utils.services.OrderedServiceImpl;
75 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
76 import org.onap.policy.drools.core.lock.Lock;
77 import org.onap.policy.drools.core.lock.LockCallback;
78 import org.onap.policy.drools.core.lock.LockState;
79 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
80 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
81 import org.onap.policy.drools.system.PolicyEngine;
82 import org.onap.policy.drools.system.PolicyEngineConstants;
83 import org.powermock.reflect.Whitebox;
85 public class DistributedLockManagerTest {
86 private static final long EXPIRE_SEC = 900L;
87 private static final long RETRY_SEC = 60L;
88 private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
89 private static final String OTHER_HOST = "other-host";
90 private static final String OTHER_OWNER = "other-owner";
91 private static final String EXPECTED_EXCEPTION = "expected exception";
92 private static final String DB_CONNECTION =
93 "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
94 private static final String DB_USER = "user";
95 private static final String DB_PASSWORD = "password";
96 private static final String OWNER_KEY = "my key";
97 private static final String RESOURCE = "my resource";
98 private static final String RESOURCE2 = "my resource #2";
99 private static final String RESOURCE3 = "my resource #3";
100 private static final String RESOURCE4 = "my resource #4";
101 private static final String RESOURCE5 = "my resource #5";
102 private static final int HOLD_SEC = 100;
103 private static final int HOLD_SEC2 = 120;
104 private static final int MAX_THREADS = 5;
105 private static final int MAX_LOOPS = 100;
106 private static final boolean TRANSIENT = true;
107 private static final boolean PERMANENT = false;
109 // number of execute() calls before the first lock attempt
110 private static final int PRE_LOCK_EXECS = 1;
112 // number of execute() calls before the first schedule attempt
113 private static final int PRE_SCHED_EXECS = 1;
115 private static Connection conn = null;
116 private static ScheduledExecutorService saveExec;
117 private static ScheduledExecutorService realExec;
120 private PolicyEngine engine;
123 private ScheduledExecutorService exsvc;
126 private ScheduledFuture<?> checker;
129 private LockCallback callback;
132 private BasicDataSource datasrc;
134 private DistributedLock lock;
136 private AtomicInteger nactive;
137 private AtomicInteger nsuccesses;
138 private DistributedLockManager feature;
142 * Configures the location of the property files and creates the DB.
144 * @throws SQLException if the DB cannot be created
147 public static void setUpBeforeClass() throws SQLException {
148 SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
150 conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
152 try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
153 + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
154 + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
155 createStmt.executeUpdate();
158 saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
160 realExec = Executors.newScheduledThreadPool(3);
164 * Restores static fields.
167 public static void tearDownAfterClass() throws SQLException {
168 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
174 * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
177 * @throws SQLException if the lock records cannot be deleted from the DB
180 public void setUp() throws SQLException {
181 MockitoAnnotations.initMocks(this);
183 nactive = new AtomicInteger(0);
184 nsuccesses = new AtomicInteger(0);
188 feature = new MyLockingFeature(true);
192 public void tearDown() throws SQLException {
197 private void cleanDb() throws SQLException {
198 try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
199 stmt.executeUpdate();
203 private void shutdownFeature() {
204 if (feature != null) {
205 feature.afterStop(engine);
211 * Tests that the feature is found in the expected service sets.
214 public void testServiceApis() {
215 assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
216 .anyMatch(obj -> obj instanceof DistributedLockManager));
220 public void testGetSequenceNumber() {
221 assertEquals(1000, feature.getSequenceNumber());
225 public void testBeforeCreateLockManager() {
226 assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
230 * Tests beforeCreate(), when getProperties() throws a runtime exception.
233 public void testBeforeCreateLockManagerEx() {
236 feature = new MyLockingFeature(false) {
238 protected Properties getProperties(String fileName) {
239 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
243 assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties()))
244 .isInstanceOf(DistributedLockManagerException.class);
248 public void testAfterStart() {
249 // verify that cleanup & expire check are both added to the queue
250 verify(exsvc).execute(any());
251 verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
255 * Tests afterStart(), when thread pool throws a runtime exception.
258 public void testAfterStartExInThreadPool() {
261 feature = new MyLockingFeature(false);
263 doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
265 assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
269 public void testDeleteExpiredDbLocks() throws SQLException {
270 // add records: two expired, one not
271 insertRecord(RESOURCE, feature.getUuidString(), -1);
272 insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
273 insertRecord(RESOURCE3, OTHER_OWNER, 0);
274 insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
276 // get the clean-up function and execute it
277 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
278 verify(exsvc).execute(captor.capture());
280 long tbegin = System.currentTimeMillis();
281 Runnable action = captor.getValue();
284 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
285 assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
286 assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
287 assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
289 assertEquals(2, getRecordCount());
293 * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
295 * @throws SQLException if an error occurs
298 public void testDeleteExpiredDbLocksEx() throws SQLException {
299 feature = new InvalidDbLockingFeature(TRANSIENT);
301 // get the clean-up function and execute it
302 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
303 verify(exsvc).execute(captor.capture());
305 Runnable action = captor.getValue();
307 // should not throw an exception
312 public void testAfterStop() {
314 verify(checker).cancel(anyBoolean());
316 feature = new DistributedLockManager();
318 // shutdown without calling afterStart()
324 * Tests afterStop(), when the data source throws an exception when close() is called.
326 * @throws SQLException if an error occurs
329 public void testAfterStopEx() throws SQLException {
332 // use a data source that throws an exception when closed
333 feature = new InvalidDbLockingFeature(TRANSIENT);
339 public void testCreateLock() throws SQLException {
340 verify(exsvc).execute(any());
342 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
343 assertTrue(lock.isWaiting());
345 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
347 // this lock should fail
348 LockCallback callback2 = mock(LockCallback.class);
349 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
350 assertTrue(lock2.isUnavailable());
351 verify(callback2, never()).lockAvailable(lock2);
352 verify(callback2).lockUnavailable(lock2);
354 // this should fail, too
355 LockCallback callback3 = mock(LockCallback.class);
356 DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
357 assertTrue(lock3.isUnavailable());
358 verify(callback3, never()).lockAvailable(lock3);
359 verify(callback3).lockUnavailable(lock3);
361 // no change to first
362 assertTrue(lock.isWaiting());
364 // no callbacks to the first lock
365 verify(callback, never()).lockAvailable(lock);
366 verify(callback, never()).lockUnavailable(lock);
368 assertTrue(lock.isWaiting());
369 assertEquals(0, getRecordCount());
372 assertTrue(lock.isActive());
373 assertEquals(1, getRecordCount());
375 verify(callback).lockAvailable(lock);
376 verify(callback, never()).lockUnavailable(lock);
378 // this should succeed
379 DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
380 assertTrue(lock4.isWaiting());
382 // after running checker, original records should still remain
383 runChecker(0, 0, EXPIRE_SEC);
384 assertEquals(1, getRecordCount());
385 verify(callback, never()).lockUnavailable(lock);
389 * Tests createLock() when the feature is not the latest instance.
392 public void testCreateLockNotLatestInstance() {
393 DistributedLockManager.setLatestInstance(null);
395 Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
396 assertTrue(lock.isUnavailable());
397 verify(callback, never()).lockAvailable(any());
398 verify(callback).lockUnavailable(lock);
402 public void testCheckExpired() throws SQLException {
403 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
406 LockCallback callback2 = mock(LockCallback.class);
407 final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
410 LockCallback callback3 = mock(LockCallback.class);
411 final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
414 LockCallback callback4 = mock(LockCallback.class);
415 final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
418 LockCallback callback5 = mock(LockCallback.class);
419 final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
422 assertEquals(5, getRecordCount());
425 updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
427 // change host of another record
428 updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
430 // change uuid of another record
431 updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC);
435 runChecker(0, 0, EXPIRE_SEC);
439 assertTrue(lock.isUnavailable());
440 assertTrue(lock2.isActive());
441 assertTrue(lock3.isUnavailable());
442 assertTrue(lock4.isActive());
443 assertTrue(lock5.isUnavailable());
449 verify(callback).lockUnavailable(lock);
450 verify(callback3).lockUnavailable(lock3);
451 verify(callback5).lockUnavailable(lock5);
453 verify(callback2, never()).lockUnavailable(lock2);
454 verify(callback4, never()).lockUnavailable(lock4);
457 // another check should have been scheduled, with the normal interval
458 runChecker(1, 0, EXPIRE_SEC);
462 * Tests checkExpired(), when schedule() throws an exception.
465 public void testCheckExpiredExecRejected() {
466 // arrange for execution to be rejected
467 when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
468 .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
470 runChecker(0, 0, EXPIRE_SEC);
474 * Tests checkExpired(), when getConnection() throws an exception.
477 public void testCheckExpiredSqlEx() {
478 // use a data source that throws an exception when getConnection() is called
479 feature = new InvalidDbLockingFeature(TRANSIENT);
481 runChecker(0, 0, EXPIRE_SEC);
483 // it should have scheduled another check, sooner
484 runChecker(0, 0, RETRY_SEC);
488 * Tests checkExpired(), when getConnection() throws an exception and the feature is
492 public void testCheckExpiredSqlExFeatureStopped() {
493 // use a data source that throws an exception when getConnection() is called
494 feature = new InvalidDbLockingFeature(TRANSIENT) {
496 protected SQLException makeEx() {
498 return super.makeEx();
502 runChecker(0, 0, EXPIRE_SEC);
504 // it should NOT have scheduled another check
505 verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
509 public void testExpireLocks() throws SQLException {
510 AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
512 feature = new MyLockingFeature(true) {
514 protected BasicDataSource makeDataSource() throws Exception {
515 // get the real data source
516 BasicDataSource src2 = super.makeDataSource();
518 when(datasrc.getConnection()).thenAnswer(answer -> {
519 DistributedLock lck = freeLock.getAndSet(null);
528 return src2.getConnection();
535 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
538 LockCallback callback2 = mock(LockCallback.class);
539 final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
542 LockCallback callback3 = mock(LockCallback.class);
543 final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
544 // don't run doLock for lock3 - leave it in the waiting state
546 LockCallback callback4 = mock(LockCallback.class);
547 final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
550 assertEquals(3, getRecordCount());
553 updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
555 // arrange to free lock4 while the checker is running
559 runChecker(0, 0, EXPIRE_SEC);
563 assertTrue(lock.isUnavailable());
564 assertTrue(lock2.isActive());
565 assertTrue(lock3.isWaiting());
566 assertTrue(lock4.isUnavailable());
569 verify(exsvc, times(PRE_LOCK_EXECS + 6)).execute(any());
571 verify(callback).lockUnavailable(lock);
572 verify(callback2, never()).lockUnavailable(lock2);
573 verify(callback3, never()).lockUnavailable(lock3);
574 verify(callback4, never()).lockUnavailable(lock4);
578 public void testDistributedLockNoArgs() {
579 DistributedLock lock = new DistributedLock();
580 assertNull(lock.getResourceId());
581 assertNull(lock.getOwnerKey());
582 assertNull(lock.getCallback());
583 assertEquals(0, lock.getHoldSec());
587 public void testDistributedLock() {
588 assertThatIllegalArgumentException()
589 .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
590 .withMessageContaining("holdSec");
592 // should generate no exception
593 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
597 public void testDistributedLockSerializable() throws Exception {
598 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
599 lock = roundTrip(lock);
601 assertTrue(lock.isWaiting());
603 assertEquals(RESOURCE, lock.getResourceId());
604 assertEquals(OWNER_KEY, lock.getOwnerKey());
605 assertNull(lock.getCallback());
606 assertEquals(HOLD_SEC, lock.getHoldSec());
610 public void testGrant() {
611 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
612 assertFalse(lock.isActive());
614 // execute the doLock() call
617 assertTrue(lock.isActive());
619 // the callback for the lock should have been run in the foreground thread
620 verify(callback).lockAvailable(lock);
624 public void testDistributedLockDeny() {
626 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
628 // get another lock - should fail
629 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
631 assertTrue(lock.isUnavailable());
633 // the callback for the second lock should have been run in the foreground thread
634 verify(callback).lockUnavailable(lock);
636 // should only have a request for the first lock
637 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
641 public void testDistributedLockFree() {
642 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
644 assertTrue(lock.free());
645 assertTrue(lock.isUnavailable());
647 // run both requests associated with the lock
651 // should not have changed state
652 assertTrue(lock.isUnavailable());
654 // attempt to free it again
655 assertFalse(lock.free());
657 // should not have queued anything else
658 verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
660 // new lock should succeed
661 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
662 assertTrue(lock2 != lock);
663 assertTrue(lock2.isWaiting());
667 * Tests that free() works on a serialized lock with a new feature.
669 * @throws Exception if an error occurs
672 public void testDistributedLockFreeSerialized() throws Exception {
673 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
675 feature = new MyLockingFeature(true);
677 lock = roundTrip(lock);
678 assertTrue(lock.free());
679 assertTrue(lock.isUnavailable());
683 * Tests free() on a serialized lock without a feature.
685 * @throws Exception if an error occurs
688 public void testDistributedLockFreeNoFeature() throws Exception {
689 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
691 DistributedLockManager.setLatestInstance(null);
693 lock = roundTrip(lock);
694 assertFalse(lock.free());
695 assertTrue(lock.isUnavailable());
699 * Tests the case where the lock is freed and doUnlock called between the call to
700 * isUnavailable() and the call to compute().
703 public void testDistributedLockFreeUnlocked() {
704 feature = new FreeWithFreeLockingFeature(true);
706 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
708 assertFalse(lock.free());
709 assertTrue(lock.isUnavailable());
713 * Tests the case where the lock is freed, but doUnlock is not completed, between the
714 * call to isUnavailable() and the call to compute().
717 public void testDistributedLockFreeLockFreed() {
718 feature = new FreeWithFreeLockingFeature(false);
720 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
722 assertFalse(lock.free());
723 assertTrue(lock.isUnavailable());
727 public void testDistributedLockExtend() {
728 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
730 // lock2 should be denied - called back by this thread
731 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
732 verify(callback, never()).lockAvailable(lock2);
733 verify(callback).lockUnavailable(lock2);
735 // lock2 will still be denied - called back by this thread
736 lock2.extend(HOLD_SEC, callback);
737 verify(callback, times(2)).lockUnavailable(lock2);
739 // force lock2 to be active - should still be denied
740 Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
741 lock2.extend(HOLD_SEC, callback);
742 verify(callback, times(3)).lockUnavailable(lock2);
744 assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
745 .withMessageContaining("holdSec");
749 assertTrue(lock.isActive());
751 // now extend the first lock
752 LockCallback callback2 = mock(LockCallback.class);
753 lock.extend(HOLD_SEC2, callback2);
754 assertTrue(lock.isWaiting());
756 // execute doExtend()
758 lock.extend(HOLD_SEC2, callback2);
759 assertEquals(HOLD_SEC2, lock.getHoldSec());
760 verify(callback2).lockAvailable(lock);
761 verify(callback2, never()).lockUnavailable(lock);
765 * Tests that extend() works on a serialized lock with a new feature.
767 * @throws Exception if an error occurs
770 public void testDistributedLockExtendSerialized() throws Exception {
771 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
775 assertTrue(lock.isActive());
777 feature = new MyLockingFeature(true);
779 lock = roundTrip(lock);
780 assertTrue(lock.isActive());
782 LockCallback scallback = mock(LockCallback.class);
784 lock.extend(HOLD_SEC, scallback);
785 assertTrue(lock.isWaiting());
787 // run doExtend (in new feature)
789 assertTrue(lock.isActive());
791 verify(scallback).lockAvailable(lock);
792 verify(scallback, never()).lockUnavailable(lock);
796 * Tests extend() on a serialized lock without a feature.
798 * @throws Exception if an error occurs
801 public void testDistributedLockExtendNoFeature() throws Exception {
802 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
806 assertTrue(lock.isActive());
808 DistributedLockManager.setLatestInstance(null);
810 lock = roundTrip(lock);
811 assertTrue(lock.isActive());
813 LockCallback scallback = mock(LockCallback.class);
815 lock.extend(HOLD_SEC, scallback);
816 assertTrue(lock.isUnavailable());
818 verify(scallback, never()).lockAvailable(lock);
819 verify(scallback).lockUnavailable(lock);
823 * Tests the case where the lock is freed and doUnlock called between the call to
824 * isUnavailable() and the call to compute().
827 public void testDistributedLockExtendUnlocked() {
828 feature = new FreeWithFreeLockingFeature(true);
830 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
832 lock.extend(HOLD_SEC2, callback);
833 assertTrue(lock.isUnavailable());
834 verify(callback).lockUnavailable(lock);
838 * Tests the case where the lock is freed, but doUnlock is not completed, between the
839 * call to isUnavailable() and the call to compute().
842 public void testDistributedLockExtendLockFreed() {
843 feature = new FreeWithFreeLockingFeature(false);
845 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
847 lock.extend(HOLD_SEC2, callback);
848 assertTrue(lock.isUnavailable());
849 verify(callback).lockUnavailable(lock);
853 public void testDistributedLockScheduleRequest() {
854 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
857 verify(callback).lockAvailable(lock);
861 public void testDistributedLockRescheduleRequest() throws SQLException {
862 // use a data source that throws an exception when getConnection() is called
863 InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
866 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
868 // invoke doLock - should fail and reschedule
871 // should still be waiting
872 assertTrue(lock.isWaiting());
873 verify(callback, never()).lockUnavailable(lock);
875 // free the lock while doLock is executing
876 invfeat.freeLock = true;
878 // try scheduled request - should just invoke doUnlock
881 // should still be waiting
882 assertTrue(lock.isUnavailable());
883 verify(callback, never()).lockUnavailable(lock);
885 // should have scheduled a retry of doUnlock
886 verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
890 public void testDistributedLockGetNextRequest() {
891 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
894 * run doLock. This should cause getNextRequest() to be called twice, once with a
895 * request in the queue, and the second time with request=null.
901 * Tests getNextRequest(), where the same request is still in the queue the second
905 public void testDistributedLockGetNextRequestSameRequest() {
906 // force reschedule to be invoked
907 feature = new InvalidDbLockingFeature(TRANSIENT);
909 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
912 * run doLock. This should cause getNextRequest() to be called twice, once with a
913 * request in the queue, and the second time with the same request again.
917 verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
921 public void testDistributedLockDoRequest() throws SQLException {
922 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
924 assertTrue(lock.isWaiting());
926 // run doLock via doRequest
929 assertTrue(lock.isActive());
933 * Tests doRequest(), when doRequest() is already running within another thread.
936 public void testDistributedLockDoRequestBusy() {
938 * this feature will invoke a request in a background thread while it's being run
939 * in a foreground thread.
941 AtomicBoolean running = new AtomicBoolean(false);
942 AtomicBoolean returned = new AtomicBoolean(false);
944 feature = new MyLockingFeature(true) {
946 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
947 LockCallback callback) {
948 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
949 private static final long serialVersionUID = 1L;
952 protected boolean doDbInsert(Connection conn) throws SQLException {
954 // already inside the thread - don't recurse any further
955 return super.doDbInsert(conn);
960 Thread thread = new Thread(() -> {
961 // run doLock from within the new thread
964 thread.setDaemon(true);
967 // wait for the background thread to complete before continuing
970 } catch (InterruptedException ignore) {
971 Thread.currentThread().interrupt();
974 returned.set(!thread.isAlive());
976 return super.doDbInsert(conn);
982 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
987 assertTrue(returned.get());
991 * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
993 * @throws SQLException if an error occurs
996 public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
997 // throw run-time exception
998 when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1000 // use a data source that throws an exception when getConnection() is called
1001 feature = new MyLockingFeature(true) {
1003 protected BasicDataSource makeDataSource() throws Exception {
1008 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1010 // invoke doLock - should NOT reschedule
1013 assertTrue(lock.isUnavailable());
1014 verify(callback).lockUnavailable(lock);
1016 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1020 * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1023 * @throws SQLException if an error occurs
1026 public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1027 // throw run-time exception
1028 when(datasrc.getConnection()).thenAnswer(answer -> {
1030 throw new IllegalStateException(EXPECTED_EXCEPTION);
1033 // use a data source that throws an exception when getConnection() is called
1034 feature = new MyLockingFeature(true) {
1036 protected BasicDataSource makeDataSource() throws Exception {
1041 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1043 // invoke doLock - should NOT reschedule
1046 assertTrue(lock.isUnavailable());
1047 verify(callback, never()).lockUnavailable(lock);
1049 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1053 * Tests doRequest() when the retry count gets exhausted.
1056 public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1057 // use a data source that throws an exception when getConnection() is called
1058 feature = new InvalidDbLockingFeature(TRANSIENT);
1060 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1062 // invoke doLock - should fail and reschedule
1065 // should still be waiting
1066 assertTrue(lock.isWaiting());
1067 verify(callback, never()).lockUnavailable(lock);
1069 // try again, via SCHEDULER - first retry fails
1072 // should still be waiting
1073 assertTrue(lock.isWaiting());
1074 verify(callback, never()).lockUnavailable(lock);
1076 // try again, via SCHEDULER - final retry fails
1078 assertTrue(lock.isUnavailable());
1080 // now callback should have been called
1081 verify(callback).lockUnavailable(lock);
1085 * Tests doRequest() when a non-transient DB exception is thrown.
1088 public void testDistributedLockDoRequestNotTransient() {
1090 * use a data source that throws a PERMANENT exception when getConnection() is
1093 feature = new InvalidDbLockingFeature(PERMANENT);
1095 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1097 // invoke doLock - should fail
1100 assertTrue(lock.isUnavailable());
1101 verify(callback).lockUnavailable(lock);
1103 // should not have scheduled anything new
1104 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1105 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1109 public void testDistributedLockDoLock() throws SQLException {
1110 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1112 // invoke doLock - should simply do an insert
1113 long tbegin = System.currentTimeMillis();
1116 assertEquals(1, getRecordCount());
1117 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1118 verify(callback).lockAvailable(lock);
1122 * Tests doLock() when the lock is freed before doLock runs.
1124 * @throws SQLException if an error occurs
1127 public void testDistributedLockDoLockFreed() throws SQLException {
1128 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1130 lock.setState(LockState.UNAVAILABLE);
1132 // invoke doLock - should do nothing
1135 assertEquals(0, getRecordCount());
1137 verify(callback, never()).lockAvailable(lock);
1141 * Tests doLock() when a DB exception is thrown.
1144 public void testDistributedLockDoLockEx() {
1145 // use a data source that throws an exception when getConnection() is called
1146 feature = new InvalidDbLockingFeature(PERMANENT);
1148 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1150 // invoke doLock - should simply do an insert
1153 // lock should have failed due to exception
1154 verify(callback).lockUnavailable(lock);
1158 * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1162 public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1163 // insert an expired record
1164 insertRecord(RESOURCE, feature.getUuidString(), 0);
1166 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1168 // invoke doLock - should simply do an update
1170 verify(callback).lockAvailable(lock);
1174 * Tests doLock() when a locked record already exists.
1177 public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1178 // insert an expired record
1179 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1181 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1186 // lock should have failed because it's already locked
1187 verify(callback).lockUnavailable(lock);
1191 public void testDistributedLockDoUnlock() throws SQLException {
1192 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1199 // invoke doUnlock()
1200 long tbegin = System.currentTimeMillis();
1203 assertEquals(0, getRecordCount());
1204 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1206 assertTrue(lock.isUnavailable());
1208 // no more callbacks should have occurred
1209 verify(callback, times(1)).lockAvailable(lock);
1210 verify(callback, never()).lockUnavailable(lock);
1214 * Tests doUnlock() when a DB exception is thrown.
1216 * @throws SQLException if an error occurs
1219 public void testDistributedLockDoUnlockEx() throws SQLException {
1220 feature = new InvalidDbLockingFeature(PERMANENT);
1222 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1224 // do NOT invoke doLock() - it will fail without a DB connection
1228 // invoke doUnlock()
1231 assertTrue(lock.isUnavailable());
1233 // no more callbacks should have occurred
1234 verify(callback, never()).lockAvailable(lock);
1235 verify(callback, never()).lockUnavailable(lock);
1239 public void testDistributedLockDoExtend() throws SQLException {
1240 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1243 LockCallback callback2 = mock(LockCallback.class);
1244 lock.extend(HOLD_SEC2, callback2);
1247 long tbegin = System.currentTimeMillis();
1250 assertEquals(1, getRecordCount());
1251 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1253 assertTrue(lock.isActive());
1255 // no more callbacks should have occurred
1256 verify(callback).lockAvailable(lock);
1257 verify(callback, never()).lockUnavailable(lock);
1259 // extension should have succeeded
1260 verify(callback2).lockAvailable(lock);
1261 verify(callback2, never()).lockUnavailable(lock);
1265 * Tests doExtend() when the lock is freed before doExtend runs.
1267 * @throws SQLException if an error occurs
1270 public void testDistributedLockDoExtendFreed() throws SQLException {
1271 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1272 lock.extend(HOLD_SEC2, callback);
1274 lock.setState(LockState.UNAVAILABLE);
1276 // invoke doExtend - should do nothing
1279 assertEquals(0, getRecordCount());
1281 verify(callback, never()).lockAvailable(lock);
1285 * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1288 * @throws SQLException if an error occurs
1291 public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1292 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1295 LockCallback callback2 = mock(LockCallback.class);
1296 lock.extend(HOLD_SEC2, callback2);
1298 // delete the record so it's forced to re-insert it
1302 long tbegin = System.currentTimeMillis();
1305 assertEquals(1, getRecordCount());
1306 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1308 assertTrue(lock.isActive());
1310 // no more callbacks should have occurred
1311 verify(callback).lockAvailable(lock);
1312 verify(callback, never()).lockUnavailable(lock);
1314 // extension should have succeeded
1315 verify(callback2).lockAvailable(lock);
1316 verify(callback2, never()).lockUnavailable(lock);
1320 * Tests doExtend() when both update and insert fail.
1322 * @throws SQLException if an error occurs
1325 public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
1327 * this feature will create a lock that returns false when doDbUpdate() is
1328 * invoked, or when doDbInsert() is invoked a second time
1330 feature = new MyLockingFeature(true) {
1332 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1333 LockCallback callback) {
1334 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1335 private static final long serialVersionUID = 1L;
1336 private int ntimes = 0;
1339 protected boolean doDbInsert(Connection conn) throws SQLException {
1344 return super.doDbInsert(conn);
1348 protected boolean doDbUpdate(Connection conn) {
1355 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1358 LockCallback callback2 = mock(LockCallback.class);
1359 lock.extend(HOLD_SEC2, callback2);
1364 assertTrue(lock.isUnavailable());
1366 // no more callbacks should have occurred
1367 verify(callback).lockAvailable(lock);
1368 verify(callback, never()).lockUnavailable(lock);
1370 // extension should have failed
1371 verify(callback2, never()).lockAvailable(lock);
1372 verify(callback2).lockUnavailable(lock);
1376 * Tests doExtend() when an exception occurs.
1378 * @throws SQLException if an error occurs
1381 public void testDistributedLockDoExtendEx() throws SQLException {
1382 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1386 * delete the record and insert one with a different owner, which will cause
1387 * doDbInsert() to throw an exception
1390 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1392 LockCallback callback2 = mock(LockCallback.class);
1393 lock.extend(HOLD_SEC2, callback2);
1398 assertTrue(lock.isUnavailable());
1400 // no more callbacks should have occurred
1401 verify(callback).lockAvailable(lock);
1402 verify(callback, never()).lockUnavailable(lock);
1404 // extension should have failed
1405 verify(callback2, never()).lockAvailable(lock);
1406 verify(callback2).lockUnavailable(lock);
1410 public void testDistributedLockToString() {
1411 String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
1412 assertNotNull(text);
1413 assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1417 public void testMakeThreadPool() {
1418 // use a REAL feature to test this
1419 feature = new DistributedLockManager();
1421 // this should create a thread pool
1422 feature.beforeCreateLockManager(engine, new Properties());
1423 feature.afterStart(engine);
1429 * Performs a multi-threaded test of the locking facility.
1431 * @throws InterruptedException if the current thread is interrupted while waiting for
1432 * the background threads to complete
1435 public void testMultiThreaded() throws InterruptedException {
1436 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1438 feature = new DistributedLockManager();
1439 feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1440 feature.afterStart(PolicyEngineConstants.getManager());
1442 List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1443 for (int x = 0; x < MAX_THREADS; ++x) {
1444 threads.add(new MyThread());
1447 threads.forEach(Thread::start);
1449 for (MyThread thread : threads) {
1451 assertFalse(thread.isAlive());
1454 for (MyThread thread : threads) {
1455 if (thread.err != null) {
1460 assertTrue(nsuccesses.get() > 0);
1463 private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
1464 boolean waitForLock) {
1465 return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
1468 private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1469 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1470 try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1471 oos.writeObject(lock);
1474 ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1475 try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1476 return (DistributedLock) ois.readObject();
1481 * Runs the checkExpired() action.
1483 * @param nskip number of actions in the work queue to skip
1484 * @param nadditional number of additional actions that appear in the work queue
1485 * <i>after</i> the checkExpired action
1486 * @param schedSec number of seconds for which the checker should have been scheduled
1488 private void runChecker(int nskip, int nadditional, long schedSec) {
1489 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1490 verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1491 Runnable action = captor.getAllValues().get(nskip);
1496 * Runs a lock action (e.g., doLock, doUnlock).
1498 * @param nskip number of actions in the work queue to skip
1499 * @param nadditional number of additional actions that appear in the work queue
1500 * <i>after</i> the desired action
1502 void runLock(int nskip, int nadditional) {
1503 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1504 verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1506 Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1511 * Runs a scheduled action (e.g., "retry" action).
1513 * @param nskip number of actions in the work queue to skip
1514 * @param nadditional number of additional actions that appear in the work queue
1515 * <i>after</i> the desired action
1517 void runSchedule(int nskip, int nadditional) {
1518 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1519 verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
1521 Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1526 * Gets a count of the number of lock records in the DB.
1528 * @return the number of lock records in the DB
1529 * @throws SQLException if an error occurs accessing the DB
1531 private int getRecordCount() throws SQLException {
1532 try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1533 ResultSet result = stmt.executeQuery()) {
1535 if (result.next()) {
1536 return result.getInt(1);
1545 * Determines if there is a record for the given resource whose expiration time is in
1546 * the expected range.
1548 * @param resourceId ID of the resource of interest
1549 * @param uuidString UUID string of the owner
1550 * @param holdSec seconds for which the lock was to be held
1551 * @param tbegin earliest time, in milliseconds, at which the record could have been
1552 * inserted into the DB
1553 * @return {@code true} if a record is found, {@code false} otherwise
1554 * @throws SQLException if an error occurs accessing the DB
1556 private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1557 try (PreparedStatement stmt =
1558 conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1559 + " WHERE resourceId=? AND host=? AND owner=?")) {
1561 stmt.setString(1, resourceId);
1562 stmt.setString(2, feature.getHostName());
1563 stmt.setString(3, uuidString);
1565 try (ResultSet result = stmt.executeQuery()) {
1566 if (result.next()) {
1567 int remaining = result.getInt(1);
1568 long maxDiff = System.currentTimeMillis() - tbegin;
1569 return (remaining >= 0 && holdSec - remaining <= maxDiff);
1579 * Inserts a record into the DB.
1581 * @param resourceId ID of the resource of interest
1582 * @param uuidString UUID string of the owner
1583 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1584 * @throws SQLException if an error occurs accessing the DB
1586 private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1587 this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset);
1590 private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1591 throws SQLException {
1592 try (PreparedStatement stmt =
1593 conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1594 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1596 stmt.setString(1, resourceId);
1597 stmt.setString(2, hostName);
1598 stmt.setString(3, uuidString);
1599 stmt.setInt(4, expireOffset);
1601 assertEquals(1, stmt.executeUpdate());
1606 * Updates a record in the DB.
1608 * @param resourceId ID of the resource of interest
1609 * @param newUuid UUID string of the <i>new</i> owner
1610 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1611 * @throws SQLException if an error occurs accessing the DB
1613 private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1614 try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1615 + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1617 stmt.setString(1, newHost);
1618 stmt.setString(2, newUuid);
1619 stmt.setInt(3, expireOffset);
1620 stmt.setString(4, resourceId);
1622 assertEquals(1, stmt.executeUpdate());
1627 * Feature that uses <i>exsvc</i> to execute requests.
1629 private class MyLockingFeature extends DistributedLockManager {
1631 public MyLockingFeature(boolean init) {
1634 exsvc = mock(ScheduledExecutorService.class);
1635 when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1636 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1639 beforeCreateLockManager(engine, new Properties());
1647 * Feature whose data source all throws exceptions.
1649 private class InvalidDbLockingFeature extends MyLockingFeature {
1650 private boolean isTransient;
1651 private boolean freeLock = false;
1653 public InvalidDbLockingFeature(boolean isTransient) {
1654 // pass "false" because we have to set the error code BEFORE calling
1658 this.isTransient = isTransient;
1660 this.beforeCreateLockManager(engine, new Properties());
1662 this.afterStart(engine);
1666 protected BasicDataSource makeDataSource() throws Exception {
1667 when(datasrc.getConnection()).thenAnswer(answer -> {
1676 doThrow(makeEx()).when(datasrc).close();
1681 protected SQLException makeEx() {
1683 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1686 return new SQLException(EXPECTED_EXCEPTION);
1692 * Feature whose locks free themselves while free() is already running.
1694 private class FreeWithFreeLockingFeature extends MyLockingFeature {
1695 private boolean relock;
1697 public FreeWithFreeLockingFeature(boolean relock) {
1699 this.relock = relock;
1703 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1704 LockCallback callback) {
1706 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1707 private static final long serialVersionUID = 1L;
1708 private boolean checked = false;
1711 public boolean isUnavailable() {
1713 return super.isUnavailable();
1718 // release and relock
1726 createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1736 * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
1737 * extend it, and then unlock it.
1739 private class MyThread extends Thread {
1740 AssertionError err = null;
1749 for (int x = 0; x < MAX_LOOPS; ++x) {
1753 } catch (AssertionError e) {
1758 private void makeAttempt() {
1760 Semaphore sem = new Semaphore(0);
1762 LockCallback cb = new LockCallback() {
1764 public void lockAvailable(Lock lock) {
1769 public void lockUnavailable(Lock lock) {
1774 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1776 // wait for callback, whether available or unavailable
1777 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1778 if (!lock.isActive()) {
1782 nsuccesses.incrementAndGet();
1784 assertEquals(1, nactive.incrementAndGet());
1786 lock.extend(HOLD_SEC2, cb);
1787 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1788 assertTrue(lock.isActive());
1790 // decrement BEFORE free()
1791 nactive.decrementAndGet();
1793 assertTrue(lock.free());
1794 assertTrue(lock.isUnavailable());
1796 } catch (InterruptedException e) {
1797 Thread.currentThread().interrupt();
1798 throw new AssertionError("interrupted", e);