2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.distributed.locking;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
26 import static org.assertj.core.api.Assertions.assertThatThrownBy;
27 import static org.junit.Assert.assertEquals;
28 import static org.junit.Assert.assertFalse;
29 import static org.junit.Assert.assertNotNull;
30 import static org.junit.Assert.assertNotSame;
31 import static org.junit.Assert.assertNull;
32 import static org.junit.Assert.assertSame;
33 import static org.junit.Assert.assertTrue;
34 import static org.mockito.ArgumentMatchers.any;
35 import static org.mockito.ArgumentMatchers.anyBoolean;
36 import static org.mockito.ArgumentMatchers.anyLong;
37 import static org.mockito.ArgumentMatchers.eq;
38 import static org.mockito.Mockito.doThrow;
39 import static org.mockito.Mockito.mock;
40 import static org.mockito.Mockito.never;
41 import static org.mockito.Mockito.times;
42 import static org.mockito.Mockito.verify;
43 import static org.mockito.Mockito.when;
45 import java.io.ByteArrayInputStream;
46 import java.io.ByteArrayOutputStream;
47 import java.io.ObjectInputStream;
48 import java.io.ObjectOutputStream;
49 import java.sql.Connection;
50 import java.sql.DriverManager;
51 import java.sql.PreparedStatement;
52 import java.sql.ResultSet;
53 import java.sql.SQLException;
54 import java.sql.SQLTransientException;
55 import java.util.ArrayList;
56 import java.util.List;
57 import java.util.Properties;
58 import java.util.concurrent.Executors;
59 import java.util.concurrent.RejectedExecutionException;
60 import java.util.concurrent.ScheduledExecutorService;
61 import java.util.concurrent.ScheduledFuture;
62 import java.util.concurrent.Semaphore;
63 import java.util.concurrent.TimeUnit;
64 import java.util.concurrent.atomic.AtomicBoolean;
65 import java.util.concurrent.atomic.AtomicInteger;
66 import java.util.concurrent.atomic.AtomicReference;
67 import org.apache.commons.dbcp2.BasicDataSource;
68 import org.junit.After;
69 import org.junit.AfterClass;
70 import org.junit.Before;
71 import org.junit.BeforeClass;
72 import org.junit.Test;
73 import org.junit.runner.RunWith;
74 import org.kie.api.runtime.KieSession;
75 import org.mockito.ArgumentCaptor;
76 import org.mockito.Mock;
77 import org.mockito.junit.MockitoJUnitRunner;
78 import org.onap.policy.common.utils.services.OrderedServiceImpl;
79 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
80 import org.onap.policy.drools.core.PolicySession;
81 import org.onap.policy.drools.core.lock.Lock;
82 import org.onap.policy.drools.core.lock.LockCallback;
83 import org.onap.policy.drools.core.lock.LockState;
84 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
85 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
86 import org.onap.policy.drools.system.PolicyEngine;
87 import org.onap.policy.drools.system.PolicyEngineConstants;
88 import org.powermock.reflect.Whitebox;
90 @RunWith(MockitoJUnitRunner.class)
91 public class DistributedLockManagerTest {
92 private static final long EXPIRE_SEC = 900L;
93 private static final long RETRY_SEC = 60L;
94 private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
95 private static final String OTHER_HOST = "other-host";
96 private static final String OTHER_OWNER = "other-owner";
97 private static final String EXPECTED_EXCEPTION = "expected exception";
98 private static final String DB_CONNECTION =
99 "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
100 private static final String DB_USER = "user";
101 private static final String DB_PASSWORD = "password";
102 private static final String OWNER_KEY = "my key";
103 private static final String RESOURCE = "my resource";
104 private static final String RESOURCE2 = "my resource #2";
105 private static final String RESOURCE3 = "my resource #3";
106 private static final String RESOURCE4 = "my resource #4";
107 private static final String RESOURCE5 = "my resource #5";
108 private static final int HOLD_SEC = 100;
109 private static final int HOLD_SEC2 = 120;
110 private static final int MAX_THREADS = 5;
111 private static final int MAX_LOOPS = 100;
112 private static final boolean TRANSIENT = true;
113 private static final boolean PERMANENT = false;
115 // number of execute() calls before the first lock attempt
116 private static final int PRE_LOCK_EXECS = 1;
118 // number of execute() calls before the first schedule attempt
119 private static final int PRE_SCHED_EXECS = 1;
121 private static Connection conn = null;
122 private static ScheduledExecutorService saveExec;
123 private static ScheduledExecutorService realExec;
126 private PolicyEngine engine;
129 private KieSession kieSess;
132 private ScheduledExecutorService exsvc;
135 private ScheduledFuture<?> checker;
138 private LockCallback callback;
141 private BasicDataSource datasrc;
143 private DistributedLock lock;
144 private PolicySession session;
146 private AtomicInteger nactive;
147 private AtomicInteger nsuccesses;
148 private DistributedLockManager feature;
152 * Configures the location of the property files and creates the DB.
154 * @throws SQLException if the DB cannot be created
157 public static void setUpBeforeClass() throws SQLException {
158 SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
160 conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
162 try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
163 + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
164 + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
165 createStmt.executeUpdate();
168 saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
170 realExec = Executors.newScheduledThreadPool(3);
174 * Restores static fields.
177 public static void tearDownAfterClass() throws SQLException {
178 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
184 * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
187 * @throws SQLException if the lock records cannot be deleted from the DB
190 public void setUp() throws SQLException {
191 // grant() and deny() calls will come through here and be immediately executed
192 session = new PolicySession(null, null, kieSess) {
194 public void insertDrools(Object object) {
195 ((Runnable) object).run();
199 session.setPolicySession();
201 nactive = new AtomicInteger(0);
202 nsuccesses = new AtomicInteger(0);
206 feature = new MyLockingFeature(true);
210 public void tearDown() throws SQLException {
215 private void cleanDb() throws SQLException {
216 try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
217 stmt.executeUpdate();
221 private void shutdownFeature() {
222 if (feature != null) {
223 feature.afterStop(engine);
229 * Tests that the feature is found in the expected service sets.
232 public void testServiceApis() {
233 assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
234 .anyMatch(obj -> obj instanceof DistributedLockManager));
238 public void testGetSequenceNumber() {
239 assertEquals(1000, feature.getSequenceNumber());
243 public void testBeforeCreateLockManager() {
244 assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
248 * Tests beforeCreate(), when getProperties() throws a runtime exception.
251 public void testBeforeCreateLockManagerEx() {
254 feature = new MyLockingFeature(false) {
256 protected Properties getProperties(String fileName) {
257 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
261 Properties props = new Properties();
262 assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, props))
263 .isInstanceOf(DistributedLockManagerException.class);
267 public void testAfterStart() {
268 // verify that cleanup & expire check are both added to the queue
269 verify(exsvc).execute(any());
270 verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
274 * Tests afterStart(), when thread pool throws a runtime exception.
277 public void testAfterStartExInThreadPool() {
280 feature = new MyLockingFeature(false);
282 doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
284 assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
288 public void testDeleteExpiredDbLocks() throws SQLException {
289 // add records: two expired, one not
290 insertRecord(RESOURCE, feature.getUuidString(), -1);
291 insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
292 insertRecord(RESOURCE3, OTHER_OWNER, 0);
293 insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
295 // get the clean-up function and execute it
296 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
297 verify(exsvc).execute(captor.capture());
299 long tbegin = System.currentTimeMillis();
300 Runnable action = captor.getValue();
303 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
304 assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
305 assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
306 assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
308 assertEquals(2, getRecordCount());
312 * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
314 * @throws SQLException if an error occurs
317 public void testDeleteExpiredDbLocksEx() throws SQLException {
318 feature = new InvalidDbLockingFeature(TRANSIENT);
320 // get the clean-up function and execute it
321 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
322 verify(exsvc).execute(captor.capture());
324 Runnable action = captor.getValue();
326 // should not throw an exception
331 public void testAfterStop() {
333 verify(checker).cancel(anyBoolean());
335 feature = new DistributedLockManager();
337 // shutdown without calling afterStart()
343 * Tests afterStop(), when the data source throws an exception when close() is called.
345 * @throws SQLException if an error occurs
348 public void testAfterStopEx() throws SQLException {
351 // use a data source that throws an exception when closed
352 feature = new InvalidDbLockingFeature(TRANSIENT);
354 assertThatCode(() -> shutdownFeature()).doesNotThrowAnyException();
358 public void testCreateLock() throws SQLException {
359 verify(exsvc).execute(any());
361 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
362 assertTrue(lock.isWaiting());
364 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
366 // this lock should fail
367 LockCallback callback2 = mock(LockCallback.class);
368 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
369 assertTrue(lock2.isUnavailable());
370 verify(callback2, never()).lockAvailable(lock2);
371 verify(callback2).lockUnavailable(lock2);
373 // this should fail, too
374 LockCallback callback3 = mock(LockCallback.class);
375 DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
376 assertTrue(lock3.isUnavailable());
377 verify(callback3, never()).lockAvailable(lock3);
378 verify(callback3).lockUnavailable(lock3);
380 // no change to first
381 assertTrue(lock.isWaiting());
383 // no callbacks to the first lock
384 verify(callback, never()).lockAvailable(lock);
385 verify(callback, never()).lockUnavailable(lock);
387 assertTrue(lock.isWaiting());
388 assertEquals(0, getRecordCount());
391 assertTrue(lock.isActive());
392 assertEquals(1, getRecordCount());
394 verify(callback).lockAvailable(lock);
395 verify(callback, never()).lockUnavailable(lock);
397 // this should succeed
398 DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
399 assertTrue(lock4.isWaiting());
401 // after running checker, original records should still remain
402 runChecker(0, 0, EXPIRE_SEC);
403 assertEquals(1, getRecordCount());
404 verify(callback, never()).lockUnavailable(lock);
408 * Tests createLock() when the feature is not the latest instance.
411 public void testCreateLockNotLatestInstance() {
412 DistributedLockManager.setLatestInstance(null);
414 Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
415 assertTrue(lock.isUnavailable());
416 verify(callback, never()).lockAvailable(any());
417 verify(callback).lockUnavailable(lock);
421 public void testCheckExpired() throws SQLException {
422 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
425 LockCallback callback2 = mock(LockCallback.class);
426 final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
429 LockCallback callback3 = mock(LockCallback.class);
430 final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
433 LockCallback callback4 = mock(LockCallback.class);
434 final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
437 LockCallback callback5 = mock(LockCallback.class);
438 final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
441 assertEquals(5, getRecordCount());
444 updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
446 // change host of another record
447 updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
449 // change uuid of another record
450 updateRecord(RESOURCE5, feature.getPdpName(), OTHER_OWNER, HOLD_SEC);
454 runChecker(0, 0, EXPIRE_SEC);
458 assertTrue(lock.isUnavailable());
459 assertTrue(lock2.isActive());
460 assertTrue(lock3.isUnavailable());
461 assertTrue(lock4.isActive());
462 assertTrue(lock5.isUnavailable());
468 verify(callback).lockUnavailable(lock);
469 verify(callback3).lockUnavailable(lock3);
470 verify(callback5).lockUnavailable(lock5);
472 verify(callback2, never()).lockUnavailable(lock2);
473 verify(callback4, never()).lockUnavailable(lock4);
476 // another check should have been scheduled, with the normal interval
477 runChecker(1, 0, EXPIRE_SEC);
481 * Tests checkExpired(), when schedule() throws an exception.
484 public void testCheckExpiredExecRejected() {
485 // arrange for execution to be rejected
486 when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
487 .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
489 runChecker(0, 0, EXPIRE_SEC);
493 * Tests checkExpired(), when getConnection() throws an exception.
496 public void testCheckExpiredSqlEx() {
497 // use a data source that throws an exception when getConnection() is called
498 feature = new InvalidDbLockingFeature(TRANSIENT);
500 runChecker(0, 0, EXPIRE_SEC);
502 // it should have scheduled another check, sooner
503 runChecker(0, 0, RETRY_SEC);
507 * Tests checkExpired(), when getConnection() throws an exception and the feature is
511 public void testCheckExpiredSqlExFeatureStopped() {
512 // use a data source that throws an exception when getConnection() is called
513 feature = new InvalidDbLockingFeature(TRANSIENT) {
515 protected SQLException makeEx() {
517 return super.makeEx();
521 runChecker(0, 0, EXPIRE_SEC);
523 // it should NOT have scheduled another check
524 verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
528 public void testExpireLocks() throws SQLException {
529 AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
531 feature = new MyLockingFeature(true) {
533 protected BasicDataSource makeDataSource() throws Exception {
534 // get the real data source
535 BasicDataSource src2 = super.makeDataSource();
537 when(datasrc.getConnection()).thenAnswer(answer -> {
538 DistributedLock lck = freeLock.getAndSet(null);
547 return src2.getConnection();
554 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
557 LockCallback callback2 = mock(LockCallback.class);
558 final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
561 LockCallback callback3 = mock(LockCallback.class);
562 final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
563 // don't run doLock for lock3 - leave it in the waiting state
565 LockCallback callback4 = mock(LockCallback.class);
566 final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
569 assertEquals(3, getRecordCount());
572 updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
574 // arrange to free lock4 while the checker is running
578 runChecker(0, 0, EXPIRE_SEC);
582 assertTrue(lock.isUnavailable());
583 assertTrue(lock2.isActive());
584 assertTrue(lock3.isWaiting());
585 assertTrue(lock4.isUnavailable());
588 verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
590 verify(callback).lockUnavailable(lock);
591 verify(callback2, never()).lockUnavailable(lock2);
592 verify(callback3, never()).lockUnavailable(lock3);
593 verify(callback4, never()).lockUnavailable(lock4);
597 public void testDistributedLockNoArgs() {
598 DistributedLock lock = new DistributedLock();
599 assertNull(lock.getResourceId());
600 assertNull(lock.getOwnerKey());
601 assertNull(lock.getCallback());
602 assertEquals(0, lock.getHoldSec());
606 public void testDistributedLock() {
607 assertThatIllegalArgumentException()
608 .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
609 .withMessageContaining("holdSec");
611 // should generate no exception
612 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
616 public void testDistributedLockSerializable() throws Exception {
617 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
618 lock = roundTrip(lock);
620 assertTrue(lock.isWaiting());
622 assertEquals(RESOURCE, lock.getResourceId());
623 assertEquals(OWNER_KEY, lock.getOwnerKey());
624 assertNull(lock.getCallback());
625 assertEquals(HOLD_SEC, lock.getHoldSec());
629 public void testGrant() {
630 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
631 assertFalse(lock.isActive());
633 // execute the doLock() call
636 assertTrue(lock.isActive());
638 // the callback for the lock should have been run in the foreground thread
639 verify(callback).lockAvailable(lock);
643 public void testDistributedLockDeny() {
645 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
647 // get another lock - should fail
648 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
650 assertTrue(lock.isUnavailable());
652 // the callback for the second lock should have been run in the foreground thread
653 verify(callback).lockUnavailable(lock);
655 // should only have a request for the first lock
656 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
660 public void testDistributedLockFree() {
661 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
663 assertTrue(lock.free());
664 assertTrue(lock.isUnavailable());
666 // run both requests associated with the lock
670 // should not have changed state
671 assertTrue(lock.isUnavailable());
673 // attempt to free it again
674 assertFalse(lock.free());
676 // should not have queued anything else
677 verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
679 // new lock should succeed
680 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
681 assertNotSame(lock2, lock);
682 assertTrue(lock2.isWaiting());
686 * Tests that free() works on a serialized lock with a new feature.
688 * @throws Exception if an error occurs
691 public void testDistributedLockFreeSerialized() throws Exception {
692 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
694 feature = new MyLockingFeature(true);
696 lock = roundTrip(lock);
697 assertTrue(lock.free());
698 assertTrue(lock.isUnavailable());
702 * Tests free() on a serialized lock without a feature.
704 * @throws Exception if an error occurs
707 public void testDistributedLockFreeNoFeature() throws Exception {
708 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
710 DistributedLockManager.setLatestInstance(null);
712 lock = roundTrip(lock);
713 assertFalse(lock.free());
714 assertTrue(lock.isUnavailable());
718 * Tests the case where the lock is freed and doUnlock called between the call to
719 * isUnavailable() and the call to compute().
722 public void testDistributedLockFreeUnlocked() {
723 feature = new FreeWithFreeLockingFeature(true);
725 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
727 assertFalse(lock.free());
728 assertTrue(lock.isUnavailable());
732 * Tests the case where the lock is freed, but doUnlock is not completed, between the
733 * call to isUnavailable() and the call to compute().
736 public void testDistributedLockFreeLockFreed() {
737 feature = new FreeWithFreeLockingFeature(false);
739 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
741 assertFalse(lock.free());
742 assertTrue(lock.isUnavailable());
746 public void testDistributedLockExtend() {
747 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
749 // lock2 should be denied - called back by this thread
750 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
751 verify(callback, never()).lockAvailable(lock2);
752 verify(callback).lockUnavailable(lock2);
754 // lock2 will still be denied - called back by this thread
755 lock2.extend(HOLD_SEC, callback);
756 verify(callback, times(2)).lockUnavailable(lock2);
758 // force lock2 to be active - should still be denied
759 Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
760 lock2.extend(HOLD_SEC, callback);
761 verify(callback, times(3)).lockUnavailable(lock2);
763 assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
764 .withMessageContaining("holdSec");
768 assertTrue(lock.isActive());
770 // now extend the first lock
771 LockCallback callback2 = mock(LockCallback.class);
772 lock.extend(HOLD_SEC2, callback2);
773 assertTrue(lock.isWaiting());
775 // execute doExtend()
777 lock.extend(HOLD_SEC2, callback2);
778 assertEquals(HOLD_SEC2, lock.getHoldSec());
779 verify(callback2).lockAvailable(lock);
780 verify(callback2, never()).lockUnavailable(lock);
784 * Tests that extend() works on a serialized lock with a new feature.
786 * @throws Exception if an error occurs
789 public void testDistributedLockExtendSerialized() throws Exception {
790 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
794 assertTrue(lock.isActive());
796 feature = new MyLockingFeature(true);
798 lock = roundTrip(lock);
799 assertTrue(lock.isActive());
801 LockCallback scallback = mock(LockCallback.class);
803 lock.extend(HOLD_SEC, scallback);
804 assertTrue(lock.isWaiting());
806 // run doExtend (in new feature)
808 assertTrue(lock.isActive());
810 verify(scallback).lockAvailable(lock);
811 verify(scallback, never()).lockUnavailable(lock);
815 * Tests extend() on a serialized lock without a feature.
817 * @throws Exception if an error occurs
820 public void testDistributedLockExtendNoFeature() throws Exception {
821 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
825 assertTrue(lock.isActive());
827 DistributedLockManager.setLatestInstance(null);
829 lock = roundTrip(lock);
830 assertTrue(lock.isActive());
832 LockCallback scallback = mock(LockCallback.class);
834 lock.extend(HOLD_SEC, scallback);
835 assertTrue(lock.isUnavailable());
837 verify(scallback, never()).lockAvailable(lock);
838 verify(scallback).lockUnavailable(lock);
842 * Tests the case where the lock is freed and doUnlock called between the call to
843 * isUnavailable() and the call to compute().
846 public void testDistributedLockExtendUnlocked() {
847 feature = new FreeWithFreeLockingFeature(true);
849 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
851 lock.extend(HOLD_SEC2, callback);
852 assertTrue(lock.isUnavailable());
853 verify(callback).lockUnavailable(lock);
857 * Tests the case where the lock is freed, but doUnlock is not completed, between the
858 * call to isUnavailable() and the call to compute().
861 public void testDistributedLockExtendLockFreed() {
862 feature = new FreeWithFreeLockingFeature(false);
864 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
866 lock.extend(HOLD_SEC2, callback);
867 assertTrue(lock.isUnavailable());
868 verify(callback).lockUnavailable(lock);
872 public void testDistributedLockScheduleRequest() {
873 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
876 verify(callback).lockAvailable(lock);
880 public void testDistributedLockRescheduleRequest() throws SQLException {
881 // use a data source that throws an exception when getConnection() is called
882 InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
885 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
887 // invoke doLock - should fail and reschedule
890 // should still be waiting
891 assertTrue(lock.isWaiting());
892 verify(callback, never()).lockUnavailable(lock);
894 // free the lock while doLock is executing
895 invfeat.freeLock = true;
897 // try scheduled request - should just invoke doUnlock
900 // should still be waiting
901 assertTrue(lock.isUnavailable());
902 verify(callback, never()).lockUnavailable(lock);
904 // should have scheduled a retry of doUnlock
905 verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
909 public void testDistributedLockGetNextRequest() {
910 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
913 * run doLock. This should cause getNextRequest() to be called twice, once with a
914 * request in the queue, and the second time with request=null.
920 * Tests getNextRequest(), where the same request is still in the queue the second
924 public void testDistributedLockGetNextRequestSameRequest() {
925 // force reschedule to be invoked
926 feature = new InvalidDbLockingFeature(TRANSIENT);
928 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
931 * run doLock. This should cause getNextRequest() to be called twice, once with a
932 * request in the queue, and the second time with the same request again.
936 verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
940 public void testDistributedLockDoRequest() throws SQLException {
941 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
943 assertTrue(lock.isWaiting());
945 // run doLock via doRequest
948 assertTrue(lock.isActive());
952 * Tests doRequest(), when doRequest() is already running within another thread.
955 public void testDistributedLockDoRequestBusy() {
957 * this feature will invoke a request in a background thread while it's being run
958 * in a foreground thread.
960 AtomicBoolean running = new AtomicBoolean(false);
961 AtomicBoolean returned = new AtomicBoolean(false);
963 feature = new MyLockingFeature(true) {
965 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
966 LockCallback callback) {
967 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
968 private static final long serialVersionUID = 1L;
971 protected boolean doDbInsert(Connection conn) throws SQLException {
973 // already inside the thread - don't recurse any further
974 return super.doDbInsert(conn);
979 Thread thread = new Thread(() -> {
980 // run doLock from within the new thread
983 thread.setDaemon(true);
986 // wait for the background thread to complete before continuing
989 } catch (InterruptedException ignore) {
990 Thread.currentThread().interrupt();
993 returned.set(!thread.isAlive());
995 return super.doDbInsert(conn);
1001 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1006 assertTrue(returned.get());
1010 * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1012 * @throws SQLException if an error occurs
1015 public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1016 // throw run-time exception
1017 when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1019 // use a data source that throws an exception when getConnection() is called
1020 feature = new MyLockingFeature(true) {
1022 protected BasicDataSource makeDataSource() throws Exception {
1027 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1029 // invoke doLock - should NOT reschedule
1032 assertTrue(lock.isUnavailable());
1033 verify(callback).lockUnavailable(lock);
1035 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1039 * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1042 * @throws SQLException if an error occurs
1045 public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1046 // throw run-time exception
1047 when(datasrc.getConnection()).thenAnswer(answer -> {
1049 throw new IllegalStateException(EXPECTED_EXCEPTION);
1052 // use a data source that throws an exception when getConnection() is called
1053 feature = new MyLockingFeature(true) {
1055 protected BasicDataSource makeDataSource() throws Exception {
1060 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1062 // invoke doLock - should NOT reschedule
1065 assertTrue(lock.isUnavailable());
1066 verify(callback, never()).lockUnavailable(lock);
1068 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1072 * Tests doRequest() when the retry count gets exhausted.
1075 public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1076 // use a data source that throws an exception when getConnection() is called
1077 feature = new InvalidDbLockingFeature(TRANSIENT);
1079 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1081 // invoke doLock - should fail and reschedule
1084 // should still be waiting
1085 assertTrue(lock.isWaiting());
1086 verify(callback, never()).lockUnavailable(lock);
1088 // try again, via SCHEDULER - first retry fails
1091 // should still be waiting
1092 assertTrue(lock.isWaiting());
1093 verify(callback, never()).lockUnavailable(lock);
1095 // try again, via SCHEDULER - final retry fails
1097 assertTrue(lock.isUnavailable());
1099 // now callback should have been called
1100 verify(callback).lockUnavailable(lock);
1104 * Tests doRequest() when a non-transient DB exception is thrown.
1107 public void testDistributedLockDoRequestNotTransient() {
1109 * use a data source that throws a PERMANENT exception when getConnection() is
1112 feature = new InvalidDbLockingFeature(PERMANENT);
1114 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1116 // invoke doLock - should fail
1119 assertTrue(lock.isUnavailable());
1120 verify(callback).lockUnavailable(lock);
1122 // should not have scheduled anything new
1123 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1124 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1128 public void testDistributedLockDoLock() throws SQLException {
1129 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1131 // invoke doLock - should simply do an insert
1132 long tbegin = System.currentTimeMillis();
1135 assertEquals(1, getRecordCount());
1136 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1137 verify(callback).lockAvailable(lock);
1141 * Tests doLock() when the lock is freed before doLock runs.
1143 * @throws SQLException if an error occurs
1146 public void testDistributedLockDoLockFreed() throws SQLException {
1147 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1149 lock.setState(LockState.UNAVAILABLE);
1151 // invoke doLock - should do nothing
1154 assertEquals(0, getRecordCount());
1156 verify(callback, never()).lockAvailable(lock);
1160 * Tests doLock() when a DB exception is thrown.
1163 public void testDistributedLockDoLockEx() {
1164 // use a data source that throws an exception when getConnection() is called
1165 feature = new InvalidDbLockingFeature(PERMANENT);
1167 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1169 // invoke doLock - should simply do an insert
1172 // lock should have failed due to exception
1173 verify(callback).lockUnavailable(lock);
1177 * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1181 public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1182 // insert an expired record
1183 insertRecord(RESOURCE, feature.getUuidString(), 0);
1185 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1187 // invoke doLock - should simply do an update
1189 verify(callback).lockAvailable(lock);
1193 * Tests doLock() when a locked record already exists.
1196 public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1197 // insert an expired record
1198 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1200 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1205 // lock should have failed because it's already locked
1206 verify(callback).lockUnavailable(lock);
1210 public void testDistributedLockDoUnlock() throws SQLException {
1211 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1218 // invoke doUnlock()
1219 long tbegin = System.currentTimeMillis();
1222 assertEquals(0, getRecordCount());
1223 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1225 assertTrue(lock.isUnavailable());
1227 // no more callbacks should have occurred
1228 verify(callback, times(1)).lockAvailable(lock);
1229 verify(callback, never()).lockUnavailable(lock);
1233 * Tests doUnlock() when a DB exception is thrown.
1235 * @throws SQLException if an error occurs
1238 public void testDistributedLockDoUnlockEx() throws SQLException {
1239 feature = new InvalidDbLockingFeature(PERMANENT);
1241 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1243 // do NOT invoke doLock() - it will fail without a DB connection
1247 // invoke doUnlock()
1250 assertTrue(lock.isUnavailable());
1252 // no more callbacks should have occurred
1253 verify(callback, never()).lockAvailable(lock);
1254 verify(callback, never()).lockUnavailable(lock);
1258 public void testDistributedLockDoExtend() throws SQLException {
1259 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1262 LockCallback callback2 = mock(LockCallback.class);
1263 lock.extend(HOLD_SEC2, callback2);
1266 long tbegin = System.currentTimeMillis();
1269 assertEquals(1, getRecordCount());
1270 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1272 assertTrue(lock.isActive());
1274 // no more callbacks should have occurred
1275 verify(callback).lockAvailable(lock);
1276 verify(callback, never()).lockUnavailable(lock);
1278 // extension should have succeeded
1279 verify(callback2).lockAvailable(lock);
1280 verify(callback2, never()).lockUnavailable(lock);
1284 * Tests doExtend() when the lock is freed before doExtend runs.
1286 * @throws SQLException if an error occurs
1289 public void testDistributedLockDoExtendFreed() throws SQLException {
1290 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1291 lock.extend(HOLD_SEC2, callback);
1293 lock.setState(LockState.UNAVAILABLE);
1295 // invoke doExtend - should do nothing
1298 assertEquals(0, getRecordCount());
1300 verify(callback, never()).lockAvailable(lock);
1304 * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1307 * @throws SQLException if an error occurs
1310 public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1311 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1314 LockCallback callback2 = mock(LockCallback.class);
1315 lock.extend(HOLD_SEC2, callback2);
1317 // delete the record so it's forced to re-insert it
1321 long tbegin = System.currentTimeMillis();
1324 assertEquals(1, getRecordCount());
1325 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1327 assertTrue(lock.isActive());
1329 // no more callbacks should have occurred
1330 verify(callback).lockAvailable(lock);
1331 verify(callback, never()).lockUnavailable(lock);
1333 // extension should have succeeded
1334 verify(callback2).lockAvailable(lock);
1335 verify(callback2, never()).lockUnavailable(lock);
1339 * Tests doExtend() when both update and insert fail.
1341 * @throws SQLException if an error occurs
1344 public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
1346 * this feature will create a lock that returns false when doDbUpdate() is
1347 * invoked, or when doDbInsert() is invoked a second time
1349 feature = new MyLockingFeature(true) {
1351 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1352 LockCallback callback) {
1353 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1354 private static final long serialVersionUID = 1L;
1355 private int ntimes = 0;
1358 protected boolean doDbInsert(Connection conn) throws SQLException {
1363 return super.doDbInsert(conn);
1367 protected boolean doDbUpdate(Connection conn) {
1374 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1377 LockCallback callback2 = mock(LockCallback.class);
1378 lock.extend(HOLD_SEC2, callback2);
1383 assertTrue(lock.isUnavailable());
1385 // no more callbacks should have occurred
1386 verify(callback).lockAvailable(lock);
1387 verify(callback, never()).lockUnavailable(lock);
1389 // extension should have failed
1390 verify(callback2, never()).lockAvailable(lock);
1391 verify(callback2).lockUnavailable(lock);
1395 * Tests doExtend() when an exception occurs.
1397 * @throws SQLException if an error occurs
1400 public void testDistributedLockDoExtendEx() throws SQLException {
1401 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1405 * delete the record and insert one with a different owner, which will cause
1406 * doDbInsert() to throw an exception
1409 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1411 LockCallback callback2 = mock(LockCallback.class);
1412 lock.extend(HOLD_SEC2, callback2);
1417 assertTrue(lock.isUnavailable());
1419 // no more callbacks should have occurred
1420 verify(callback).lockAvailable(lock);
1421 verify(callback, never()).lockUnavailable(lock);
1423 // extension should have failed
1424 verify(callback2, never()).lockAvailable(lock);
1425 verify(callback2).lockUnavailable(lock);
1429 public void testDistributedLockToString() {
1430 String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
1431 assertNotNull(text);
1432 assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1436 public void testMakeThreadPool() {
1437 // use a REAL feature to test this
1438 feature = new DistributedLockManager();
1440 // this should create a thread pool
1441 feature.beforeCreateLockManager(engine, new Properties());
1442 feature.afterStart(engine);
1444 assertThatCode(() -> shutdownFeature()).doesNotThrowAnyException();
1448 * Performs a multi-threaded test of the locking facility.
1450 * @throws InterruptedException if the current thread is interrupted while waiting for
1451 * the background threads to complete
1454 public void testMultiThreaded() throws InterruptedException {
1455 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1457 feature = new DistributedLockManager();
1458 feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1459 feature.afterStart(PolicyEngineConstants.getManager());
1461 List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1462 for (int x = 0; x < MAX_THREADS; ++x) {
1463 threads.add(new MyThread());
1466 threads.forEach(Thread::start);
1468 for (MyThread thread : threads) {
1470 assertFalse(thread.isAlive());
1473 for (MyThread thread : threads) {
1474 if (thread.err != null) {
1479 assertTrue(nsuccesses.get() > 0);
1482 private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
1483 boolean waitForLock) {
1484 return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
1487 private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1488 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1489 try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1490 oos.writeObject(lock);
1493 ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1494 try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1495 return (DistributedLock) ois.readObject();
1500 * Runs the checkExpired() action.
1502 * @param nskip number of actions in the work queue to skip
1503 * @param nadditional number of additional actions that appear in the work queue
1504 * <i>after</i> the checkExpired action
1505 * @param schedSec number of seconds for which the checker should have been scheduled
1507 private void runChecker(int nskip, int nadditional, long schedSec) {
1508 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1509 verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1510 Runnable action = captor.getAllValues().get(nskip);
1515 * Runs a lock action (e.g., doLock, doUnlock).
1517 * @param nskip number of actions in the work queue to skip
1518 * @param nadditional number of additional actions that appear in the work queue
1519 * <i>after</i> the desired action
1521 void runLock(int nskip, int nadditional) {
1522 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1523 verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1525 Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1530 * Runs a scheduled action (e.g., "retry" action).
1532 * @param nskip number of actions in the work queue to skip
1533 * @param nadditional number of additional actions that appear in the work queue
1534 * <i>after</i> the desired action
1536 void runSchedule(int nskip, int nadditional) {
1537 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1538 verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
1540 Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1545 * Gets a count of the number of lock records in the DB.
1547 * @return the number of lock records in the DB
1548 * @throws SQLException if an error occurs accessing the DB
1550 private int getRecordCount() throws SQLException {
1551 try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1552 ResultSet result = stmt.executeQuery()) {
1554 if (result.next()) {
1555 return result.getInt(1);
1564 * Determines if there is a record for the given resource whose expiration time is in
1565 * the expected range.
1567 * @param resourceId ID of the resource of interest
1568 * @param uuidString UUID string of the owner
1569 * @param holdSec seconds for which the lock was to be held
1570 * @param tbegin earliest time, in milliseconds, at which the record could have been
1571 * inserted into the DB
1572 * @return {@code true} if a record is found, {@code false} otherwise
1573 * @throws SQLException if an error occurs accessing the DB
1575 private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1576 try (PreparedStatement stmt =
1577 conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1578 + " WHERE resourceId=? AND host=? AND owner=?")) {
1580 stmt.setString(1, resourceId);
1581 stmt.setString(2, feature.getPdpName());
1582 stmt.setString(3, uuidString);
1584 try (ResultSet result = stmt.executeQuery()) {
1585 if (result.next()) {
1586 int remaining = result.getInt(1);
1587 long maxDiff = System.currentTimeMillis() - tbegin;
1588 return (remaining >= 0 && holdSec - remaining <= maxDiff);
1598 * Inserts a record into the DB.
1600 * @param resourceId ID of the resource of interest
1601 * @param uuidString UUID string of the owner
1602 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1603 * @throws SQLException if an error occurs accessing the DB
1605 private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1606 this.insertRecord(resourceId, feature.getPdpName(), uuidString, expireOffset);
1609 private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1610 throws SQLException {
1611 try (PreparedStatement stmt =
1612 conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1613 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1615 stmt.setString(1, resourceId);
1616 stmt.setString(2, hostName);
1617 stmt.setString(3, uuidString);
1618 stmt.setInt(4, expireOffset);
1620 assertEquals(1, stmt.executeUpdate());
1625 * Updates a record in the DB.
1627 * @param resourceId ID of the resource of interest
1628 * @param newUuid UUID string of the <i>new</i> owner
1629 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1630 * @throws SQLException if an error occurs accessing the DB
1632 private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1633 try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1634 + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1636 stmt.setString(1, newHost);
1637 stmt.setString(2, newUuid);
1638 stmt.setInt(3, expireOffset);
1639 stmt.setString(4, resourceId);
1641 assertEquals(1, stmt.executeUpdate());
1646 * Feature that uses <i>exsvc</i> to execute requests.
1648 private class MyLockingFeature extends DistributedLockManager {
1650 public MyLockingFeature(boolean init) {
1653 exsvc = mock(ScheduledExecutorService.class);
1654 when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1655 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1658 beforeCreateLockManager(engine, new Properties());
1666 * Feature whose data source all throws exceptions.
1668 private class InvalidDbLockingFeature extends MyLockingFeature {
1669 private boolean isTransient;
1670 private boolean freeLock = false;
1672 public InvalidDbLockingFeature(boolean isTransient) {
1673 // pass "false" because we have to set the error code BEFORE calling
1677 this.isTransient = isTransient;
1679 this.beforeCreateLockManager(engine, new Properties());
1681 this.afterStart(engine);
1685 protected BasicDataSource makeDataSource() throws Exception {
1686 when(datasrc.getConnection()).thenAnswer(answer -> {
1695 doThrow(makeEx()).when(datasrc).close();
1700 protected SQLException makeEx() {
1702 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1705 return new SQLException(EXPECTED_EXCEPTION);
1711 * Feature whose locks free themselves while free() is already running.
1713 private class FreeWithFreeLockingFeature extends MyLockingFeature {
1714 private boolean relock;
1716 public FreeWithFreeLockingFeature(boolean relock) {
1718 this.relock = relock;
1722 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1723 LockCallback callback) {
1725 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1726 private static final long serialVersionUID = 1L;
1727 private boolean checked = false;
1730 public boolean isUnavailable() {
1732 return super.isUnavailable();
1737 // release and relock
1745 createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1755 * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
1756 * extend it, and then unlock it.
1758 private class MyThread extends Thread {
1759 AssertionError err = null;
1768 for (int x = 0; x < MAX_LOOPS; ++x) {
1772 } catch (AssertionError e) {
1777 private void makeAttempt() {
1779 Semaphore sem = new Semaphore(0);
1781 LockCallback cb = new LockCallback() {
1783 public void lockAvailable(Lock lock) {
1788 public void lockUnavailable(Lock lock) {
1793 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1795 // wait for callback, whether available or unavailable
1796 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1797 if (!lock.isActive()) {
1801 nsuccesses.incrementAndGet();
1803 assertEquals(1, nactive.incrementAndGet());
1805 lock.extend(HOLD_SEC2, cb);
1806 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1807 assertTrue(lock.isActive());
1809 // decrement BEFORE free()
1810 nactive.decrementAndGet();
1812 assertTrue(lock.free());
1813 assertTrue(lock.isUnavailable());
1815 } catch (InterruptedException e) {
1816 Thread.currentThread().interrupt();
1817 throw new AssertionError("interrupted", e);