2  * ============LICENSE_START=======================================================
 
   4  * ================================================================================
 
   5  * Copyright (C) 2019-2021 AT&T Intellectual Property. All rights reserved.
 
   6  * ================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License");
 
   8  * you may not use this file except in compliance with the License.
 
   9  * You may obtain a copy of the License at
 
  11  *      http://www.apache.org/licenses/LICENSE-2.0
 
  13  * Unless required by applicable law or agreed to in writing, software
 
  14  * distributed under the License is distributed on an "AS IS" BASIS,
 
  15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16  * See the License for the specific language governing permissions and
 
  17  * limitations under the License.
 
  18  * ============LICENSE_END=========================================================
 
  21 package org.onap.policy.distributed.locking;
 
  23 import static org.assertj.core.api.Assertions.assertThat;
 
  24 import static org.assertj.core.api.Assertions.assertThatCode;
 
  25 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
 
  26 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
  27 import static org.junit.Assert.assertEquals;
 
  28 import static org.junit.Assert.assertFalse;
 
  29 import static org.junit.Assert.assertNotNull;
 
  30 import static org.junit.Assert.assertNotSame;
 
  31 import static org.junit.Assert.assertNull;
 
  32 import static org.junit.Assert.assertSame;
 
  33 import static org.junit.Assert.assertTrue;
 
  34 import static org.mockito.ArgumentMatchers.any;
 
  35 import static org.mockito.ArgumentMatchers.anyBoolean;
 
  36 import static org.mockito.ArgumentMatchers.anyLong;
 
  37 import static org.mockito.ArgumentMatchers.eq;
 
  38 import static org.mockito.Mockito.doThrow;
 
  39 import static org.mockito.Mockito.mock;
 
  40 import static org.mockito.Mockito.never;
 
  41 import static org.mockito.Mockito.times;
 
  42 import static org.mockito.Mockito.verify;
 
  43 import static org.mockito.Mockito.when;
 
  45 import java.io.ByteArrayInputStream;
 
  46 import java.io.ByteArrayOutputStream;
 
  47 import java.io.ObjectInputStream;
 
  48 import java.io.ObjectOutputStream;
 
  49 import java.sql.Connection;
 
  50 import java.sql.DriverManager;
 
  51 import java.sql.PreparedStatement;
 
  52 import java.sql.ResultSet;
 
  53 import java.sql.SQLException;
 
  54 import java.sql.SQLTransientException;
 
  55 import java.util.ArrayList;
 
  56 import java.util.List;
 
  57 import java.util.Properties;
 
  58 import java.util.concurrent.Executors;
 
  59 import java.util.concurrent.RejectedExecutionException;
 
  60 import java.util.concurrent.ScheduledExecutorService;
 
  61 import java.util.concurrent.ScheduledFuture;
 
  62 import java.util.concurrent.Semaphore;
 
  63 import java.util.concurrent.TimeUnit;
 
  64 import java.util.concurrent.atomic.AtomicBoolean;
 
  65 import java.util.concurrent.atomic.AtomicInteger;
 
  66 import java.util.concurrent.atomic.AtomicReference;
 
  67 import org.apache.commons.dbcp2.BasicDataSource;
 
  68 import org.junit.After;
 
  69 import org.junit.AfterClass;
 
  70 import org.junit.Before;
 
  71 import org.junit.BeforeClass;
 
  72 import org.junit.Test;
 
  73 import org.junit.runner.RunWith;
 
  74 import org.kie.api.runtime.KieSession;
 
  75 import org.mockito.ArgumentCaptor;
 
  76 import org.mockito.Mock;
 
  77 import org.mockito.junit.MockitoJUnitRunner;
 
  78 import org.onap.policy.common.utils.services.OrderedServiceImpl;
 
  79 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
 
  80 import org.onap.policy.drools.core.PolicySession;
 
  81 import org.onap.policy.drools.core.lock.Lock;
 
  82 import org.onap.policy.drools.core.lock.LockCallback;
 
  83 import org.onap.policy.drools.core.lock.LockState;
 
  84 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
 
  85 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
 
  86 import org.onap.policy.drools.system.PolicyEngine;
 
  87 import org.onap.policy.drools.system.PolicyEngineConstants;
 
  88 import org.powermock.reflect.Whitebox;
 
  90 @RunWith(MockitoJUnitRunner.class)
 
  91 public class DistributedLockManagerTest {
 
  92     private static final long EXPIRE_SEC = 900L;
 
  93     private static final long RETRY_SEC = 60L;
 
  94     private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
 
  95     private static final String OTHER_HOST = "other-host";
 
  96     private static final String OTHER_OWNER = "other-owner";
 
  97     private static final String EXPECTED_EXCEPTION = "expected exception";
 
  98     private static final String DB_CONNECTION =
 
  99                     "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
 
 100     private static final String DB_USER = "user";
 
 101     private static final String DB_PASSWORD = "password";
 
 102     private static final String OWNER_KEY = "my key";
 
 103     private static final String RESOURCE = "my resource";
 
 104     private static final String RESOURCE2 = "my resource #2";
 
 105     private static final String RESOURCE3 = "my resource #3";
 
 106     private static final String RESOURCE4 = "my resource #4";
 
 107     private static final String RESOURCE5 = "my resource #5";
 
 108     private static final int HOLD_SEC = 100;
 
 109     private static final int HOLD_SEC2 = 120;
 
 110     private static final int MAX_THREADS = 5;
 
 111     private static final int MAX_LOOPS = 100;
 
 112     private static final boolean TRANSIENT = true;
 
 113     private static final boolean PERMANENT = false;
 
 115     // number of execute() calls before the first lock attempt
 
 116     private static final int PRE_LOCK_EXECS = 1;
 
 118     // number of execute() calls before the first schedule attempt
 
 119     private static final int PRE_SCHED_EXECS = 1;
 
 121     private static Connection conn = null;
 
 122     private static ScheduledExecutorService saveExec;
 
 123     private static ScheduledExecutorService realExec;
 
 126     private PolicyEngine engine;
 
 129     private KieSession kieSess;
 
 132     private ScheduledExecutorService exsvc;
 
 135     private ScheduledFuture<?> checker;
 
 138     private LockCallback callback;
 
 141     private BasicDataSource datasrc;
 
 143     private DistributedLock lock;
 
 144     private PolicySession session;
 
 146     private AtomicInteger nactive;
 
 147     private AtomicInteger nsuccesses;
 
 148     private DistributedLockManager feature;
 
 152      * Configures the location of the property files and creates the DB.
 
 154      * @throws SQLException if the DB cannot be created
 
 157     public static void setUpBeforeClass() throws SQLException {
 
 158         SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
 
 160         conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
 
 162         try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
 
 163                         + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
 
 164                         + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
 
 165             createStmt.executeUpdate();
 
 168         saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
 
 170         realExec = Executors.newScheduledThreadPool(3);
 
 174      * Restores static fields.
 
 177     public static void tearDownAfterClass() throws SQLException {
 
 178         Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
 
 184      * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
 
 187      * @throws SQLException if the lock records cannot be deleted from the DB
 
 190     public void setUp() throws SQLException {
 
 191         // grant() and deny() calls will come through here and be immediately executed
 
 192         session = new PolicySession(null, null, kieSess) {
 
 194             public void insertDrools(Object object) {
 
 195                 ((Runnable) object).run();
 
 199         session.setPolicySession();
 
 201         nactive = new AtomicInteger(0);
 
 202         nsuccesses = new AtomicInteger(0);
 
 206         feature = new MyLockingFeature(true);
 
 210     public void tearDown() throws SQLException {
 
 215     private void cleanDb() throws SQLException {
 
 216         try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
 
 217             stmt.executeUpdate();
 
 221     private void shutdownFeature() {
 
 222         if (feature != null) {
 
 223             feature.afterStop(engine);
 
 229      * Tests that the feature is found in the expected service sets.
 
 232     public void testServiceApis() {
 
 233         assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
 
 234                         .anyMatch(obj -> obj instanceof DistributedLockManager));
 
 238     public void testGetSequenceNumber() {
 
 239         assertEquals(1000, feature.getSequenceNumber());
 
 243     public void testBeforeCreateLockManager() {
 
 244         assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
 
 248      * Tests beforeCreate(), when getProperties() throws a runtime exception.
 
 251     public void testBeforeCreateLockManagerEx() {
 
 254         feature = new MyLockingFeature(false) {
 
 256             protected Properties getProperties(String fileName) {
 
 257                 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
 
 261         Properties props = new Properties();
 
 262         assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, props))
 
 263                         .isInstanceOf(DistributedLockManagerException.class);
 
 267     public void testAfterStart() {
 
 268         // verify that cleanup & expire check are both added to the queue
 
 269         verify(exsvc).execute(any());
 
 270         verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
 
 274      * Tests afterStart(), when thread pool throws a runtime exception.
 
 277     public void testAfterStartExInThreadPool() {
 
 280         feature = new MyLockingFeature(false);
 
 282         doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
 
 284         assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
 
 288     public void testDeleteExpiredDbLocks() throws SQLException {
 
 289         // add records: two expired, one not
 
 290         insertRecord(RESOURCE, feature.getUuidString(), -1);
 
 291         insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
 
 292         insertRecord(RESOURCE3, OTHER_OWNER, 0);
 
 293         insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
 
 295         // get the clean-up function and execute it
 
 296         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
 
 297         verify(exsvc).execute(captor.capture());
 
 299         long tbegin = System.currentTimeMillis();
 
 300         Runnable action = captor.getValue();
 
 303         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
 
 304         assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
 
 305         assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
 
 306         assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
 
 308         assertEquals(2, getRecordCount());
 
 312      * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
 
 314      * @throws SQLException if an error occurs
 
 317     public void testDeleteExpiredDbLocksEx() throws SQLException {
 
 318         feature = new InvalidDbLockingFeature(TRANSIENT);
 
 320         // get the clean-up function and execute it
 
 321         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
 
 322         verify(exsvc).execute(captor.capture());
 
 324         Runnable action = captor.getValue();
 
 326         // should not throw an exception
 
 331     public void testAfterStop() {
 
 333         verify(checker).cancel(anyBoolean());
 
 335         feature = new DistributedLockManager();
 
 337         // shutdown without calling afterStart()
 
 343      * Tests afterStop(), when the data source throws an exception when close() is called.
 
 345      * @throws SQLException if an error occurs
 
 348     public void testAfterStopEx() throws SQLException {
 
 351         // use a data source that throws an exception when closed
 
 352         feature = new InvalidDbLockingFeature(TRANSIENT);
 
 354         assertThatCode(() -> shutdownFeature()).doesNotThrowAnyException();
 
 358     public void testCreateLock() throws SQLException {
 
 359         verify(exsvc).execute(any());
 
 361         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 362         assertTrue(lock.isWaiting());
 
 364         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
 
 366         // this lock should fail
 
 367         LockCallback callback2 = mock(LockCallback.class);
 
 368         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
 
 369         assertTrue(lock2.isUnavailable());
 
 370         verify(callback2, never()).lockAvailable(lock2);
 
 371         verify(callback2).lockUnavailable(lock2);
 
 373         // this should fail, too
 
 374         LockCallback callback3 = mock(LockCallback.class);
 
 375         DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
 
 376         assertTrue(lock3.isUnavailable());
 
 377         verify(callback3, never()).lockAvailable(lock3);
 
 378         verify(callback3).lockUnavailable(lock3);
 
 380         // no change to first
 
 381         assertTrue(lock.isWaiting());
 
 383         // no callbacks to the first lock
 
 384         verify(callback, never()).lockAvailable(lock);
 
 385         verify(callback, never()).lockUnavailable(lock);
 
 387         assertTrue(lock.isWaiting());
 
 388         assertEquals(0, getRecordCount());
 
 391         assertTrue(lock.isActive());
 
 392         assertEquals(1, getRecordCount());
 
 394         verify(callback).lockAvailable(lock);
 
 395         verify(callback, never()).lockUnavailable(lock);
 
 397         // this should succeed
 
 398         DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
 
 399         assertTrue(lock4.isWaiting());
 
 401         // after running checker, original records should still remain
 
 402         runChecker(0, 0, EXPIRE_SEC);
 
 403         assertEquals(1, getRecordCount());
 
 404         verify(callback, never()).lockUnavailable(lock);
 
 408      * Tests createLock() when the feature is not the latest instance.
 
 411     public void testCreateLockNotLatestInstance() {
 
 412         DistributedLockManager.setLatestInstance(null);
 
 414         Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 415         assertTrue(lock.isUnavailable());
 
 416         verify(callback, never()).lockAvailable(any());
 
 417         verify(callback).lockUnavailable(lock);
 
 421     public void testCheckExpired() throws SQLException {
 
 422         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 425         LockCallback callback2 = mock(LockCallback.class);
 
 426         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
 
 429         LockCallback callback3 = mock(LockCallback.class);
 
 430         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
 
 433         LockCallback callback4 = mock(LockCallback.class);
 
 434         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
 
 437         LockCallback callback5 = mock(LockCallback.class);
 
 438         final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
 
 441         assertEquals(5, getRecordCount());
 
 444         updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
 
 446         // change host of another record
 
 447         updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
 
 449         // change uuid of another record
 
 450         updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC);
 
 454         runChecker(0, 0, EXPIRE_SEC);
 
 458         assertTrue(lock.isUnavailable());
 
 459         assertTrue(lock2.isActive());
 
 460         assertTrue(lock3.isUnavailable());
 
 461         assertTrue(lock4.isActive());
 
 462         assertTrue(lock5.isUnavailable());
 
 468         verify(callback).lockUnavailable(lock);
 
 469         verify(callback3).lockUnavailable(lock3);
 
 470         verify(callback5).lockUnavailable(lock5);
 
 472         verify(callback2, never()).lockUnavailable(lock2);
 
 473         verify(callback4, never()).lockUnavailable(lock4);
 
 476         // another check should have been scheduled, with the normal interval
 
 477         runChecker(1, 0, EXPIRE_SEC);
 
 481      * Tests checkExpired(), when schedule() throws an exception.
 
 484     public void testCheckExpiredExecRejected() {
 
 485         // arrange for execution to be rejected
 
 486         when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
 
 487                         .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
 
 489         runChecker(0, 0, EXPIRE_SEC);
 
 493      * Tests checkExpired(), when getConnection() throws an exception.
 
 496     public void testCheckExpiredSqlEx() {
 
 497         // use a data source that throws an exception when getConnection() is called
 
 498         feature = new InvalidDbLockingFeature(TRANSIENT);
 
 500         runChecker(0, 0, EXPIRE_SEC);
 
 502         // it should have scheduled another check, sooner
 
 503         runChecker(0, 0, RETRY_SEC);
 
 507      * Tests checkExpired(), when getConnection() throws an exception and the feature is
 
 511     public void testCheckExpiredSqlExFeatureStopped() {
 
 512         // use a data source that throws an exception when getConnection() is called
 
 513         feature = new InvalidDbLockingFeature(TRANSIENT) {
 
 515             protected SQLException makeEx() {
 
 517                 return super.makeEx();
 
 521         runChecker(0, 0, EXPIRE_SEC);
 
 523         // it should NOT have scheduled another check
 
 524         verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
 
 528     public void testExpireLocks() throws SQLException {
 
 529         AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
 
 531         feature = new MyLockingFeature(true) {
 
 533             protected BasicDataSource makeDataSource() throws Exception {
 
 534                 // get the real data source
 
 535                 BasicDataSource src2 = super.makeDataSource();
 
 537                 when(datasrc.getConnection()).thenAnswer(answer -> {
 
 538                     DistributedLock lck = freeLock.getAndSet(null);
 
 547                     return src2.getConnection();
 
 554         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 557         LockCallback callback2 = mock(LockCallback.class);
 
 558         final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
 
 561         LockCallback callback3 = mock(LockCallback.class);
 
 562         final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
 
 563         // don't run doLock for lock3 - leave it in the waiting state
 
 565         LockCallback callback4 = mock(LockCallback.class);
 
 566         final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
 
 569         assertEquals(3, getRecordCount());
 
 572         updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
 
 574         // arrange to free lock4 while the checker is running
 
 578         runChecker(0, 0, EXPIRE_SEC);
 
 582         assertTrue(lock.isUnavailable());
 
 583         assertTrue(lock2.isActive());
 
 584         assertTrue(lock3.isWaiting());
 
 585         assertTrue(lock4.isUnavailable());
 
 588         verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
 
 590         verify(callback).lockUnavailable(lock);
 
 591         verify(callback2, never()).lockUnavailable(lock2);
 
 592         verify(callback3, never()).lockUnavailable(lock3);
 
 593         verify(callback4, never()).lockUnavailable(lock4);
 
 597     public void testDistributedLockNoArgs() {
 
 598         DistributedLock lock = new DistributedLock();
 
 599         assertNull(lock.getResourceId());
 
 600         assertNull(lock.getOwnerKey());
 
 601         assertNull(lock.getCallback());
 
 602         assertEquals(0, lock.getHoldSec());
 
 606     public void testDistributedLock() {
 
 607         assertThatIllegalArgumentException()
 
 608                         .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
 
 609                         .withMessageContaining("holdSec");
 
 611         // should generate no exception
 
 612         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 616     public void testDistributedLockSerializable() throws Exception {
 
 617         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 618         lock = roundTrip(lock);
 
 620         assertTrue(lock.isWaiting());
 
 622         assertEquals(RESOURCE, lock.getResourceId());
 
 623         assertEquals(OWNER_KEY, lock.getOwnerKey());
 
 624         assertNull(lock.getCallback());
 
 625         assertEquals(HOLD_SEC, lock.getHoldSec());
 
 629     public void testGrant() {
 
 630         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 631         assertFalse(lock.isActive());
 
 633         // execute the doLock() call
 
 636         assertTrue(lock.isActive());
 
 638         // the callback for the lock should have been run in the foreground thread
 
 639         verify(callback).lockAvailable(lock);
 
 643     public void testDistributedLockDeny() {
 
 645         feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 647         // get another lock - should fail
 
 648         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 650         assertTrue(lock.isUnavailable());
 
 652         // the callback for the second lock should have been run in the foreground thread
 
 653         verify(callback).lockUnavailable(lock);
 
 655         // should only have a request for the first lock
 
 656         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
 
 660     public void testDistributedLockFree() {
 
 661         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 663         assertTrue(lock.free());
 
 664         assertTrue(lock.isUnavailable());
 
 666         // run both requests associated with the lock
 
 670         // should not have changed state
 
 671         assertTrue(lock.isUnavailable());
 
 673         // attempt to free it again
 
 674         assertFalse(lock.free());
 
 676         // should not have queued anything else
 
 677         verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
 
 679         // new lock should succeed
 
 680         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 681         assertNotSame(lock2, lock);
 
 682         assertTrue(lock2.isWaiting());
 
 686      * Tests that free() works on a serialized lock with a new feature.
 
 688      * @throws Exception if an error occurs
 
 691     public void testDistributedLockFreeSerialized() throws Exception {
 
 692         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 694         feature = new MyLockingFeature(true);
 
 696         lock = roundTrip(lock);
 
 697         assertTrue(lock.free());
 
 698         assertTrue(lock.isUnavailable());
 
 702      * Tests free() on a serialized lock without a feature.
 
 704      * @throws Exception if an error occurs
 
 707     public void testDistributedLockFreeNoFeature() throws Exception {
 
 708         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 710         DistributedLockManager.setLatestInstance(null);
 
 712         lock = roundTrip(lock);
 
 713         assertFalse(lock.free());
 
 714         assertTrue(lock.isUnavailable());
 
 718      * Tests the case where the lock is freed and doUnlock called between the call to
 
 719      * isUnavailable() and the call to compute().
 
 722     public void testDistributedLockFreeUnlocked() {
 
 723         feature = new FreeWithFreeLockingFeature(true);
 
 725         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 727         assertFalse(lock.free());
 
 728         assertTrue(lock.isUnavailable());
 
 732      * Tests the case where the lock is freed, but doUnlock is not completed, between the
 
 733      * call to isUnavailable() and the call to compute().
 
 736     public void testDistributedLockFreeLockFreed() {
 
 737         feature = new FreeWithFreeLockingFeature(false);
 
 739         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 741         assertFalse(lock.free());
 
 742         assertTrue(lock.isUnavailable());
 
 746     public void testDistributedLockExtend() {
 
 747         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 749         // lock2 should be denied - called back by this thread
 
 750         DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 751         verify(callback, never()).lockAvailable(lock2);
 
 752         verify(callback).lockUnavailable(lock2);
 
 754         // lock2 will still be denied - called back by this thread
 
 755         lock2.extend(HOLD_SEC, callback);
 
 756         verify(callback, times(2)).lockUnavailable(lock2);
 
 758         // force lock2 to be active - should still be denied
 
 759         Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
 
 760         lock2.extend(HOLD_SEC, callback);
 
 761         verify(callback, times(3)).lockUnavailable(lock2);
 
 763         assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
 
 764                         .withMessageContaining("holdSec");
 
 768         assertTrue(lock.isActive());
 
 770         // now extend the first lock
 
 771         LockCallback callback2 = mock(LockCallback.class);
 
 772         lock.extend(HOLD_SEC2, callback2);
 
 773         assertTrue(lock.isWaiting());
 
 775         // execute doExtend()
 
 777         lock.extend(HOLD_SEC2, callback2);
 
 778         assertEquals(HOLD_SEC2, lock.getHoldSec());
 
 779         verify(callback2).lockAvailable(lock);
 
 780         verify(callback2, never()).lockUnavailable(lock);
 
 784      * Tests that extend() works on a serialized lock with a new feature.
 
 786      * @throws Exception if an error occurs
 
 789     public void testDistributedLockExtendSerialized() throws Exception {
 
 790         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 794         assertTrue(lock.isActive());
 
 796         feature = new MyLockingFeature(true);
 
 798         lock = roundTrip(lock);
 
 799         assertTrue(lock.isActive());
 
 801         LockCallback scallback = mock(LockCallback.class);
 
 803         lock.extend(HOLD_SEC, scallback);
 
 804         assertTrue(lock.isWaiting());
 
 806         // run doExtend (in new feature)
 
 808         assertTrue(lock.isActive());
 
 810         verify(scallback).lockAvailable(lock);
 
 811         verify(scallback, never()).lockUnavailable(lock);
 
 815      * Tests extend() on a serialized lock without a feature.
 
 817      * @throws Exception if an error occurs
 
 820     public void testDistributedLockExtendNoFeature() throws Exception {
 
 821         DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 825         assertTrue(lock.isActive());
 
 827         DistributedLockManager.setLatestInstance(null);
 
 829         lock = roundTrip(lock);
 
 830         assertTrue(lock.isActive());
 
 832         LockCallback scallback = mock(LockCallback.class);
 
 834         lock.extend(HOLD_SEC, scallback);
 
 835         assertTrue(lock.isUnavailable());
 
 837         verify(scallback, never()).lockAvailable(lock);
 
 838         verify(scallback).lockUnavailable(lock);
 
 842      * Tests the case where the lock is freed and doUnlock called between the call to
 
 843      * isUnavailable() and the call to compute().
 
 846     public void testDistributedLockExtendUnlocked() {
 
 847         feature = new FreeWithFreeLockingFeature(true);
 
 849         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 851         lock.extend(HOLD_SEC2, callback);
 
 852         assertTrue(lock.isUnavailable());
 
 853         verify(callback).lockUnavailable(lock);
 
 857      * Tests the case where the lock is freed, but doUnlock is not completed, between the
 
 858      * call to isUnavailable() and the call to compute().
 
 861     public void testDistributedLockExtendLockFreed() {
 
 862         feature = new FreeWithFreeLockingFeature(false);
 
 864         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 866         lock.extend(HOLD_SEC2, callback);
 
 867         assertTrue(lock.isUnavailable());
 
 868         verify(callback).lockUnavailable(lock);
 
 872     public void testDistributedLockScheduleRequest() {
 
 873         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 876         verify(callback).lockAvailable(lock);
 
 880     public void testDistributedLockRescheduleRequest() throws SQLException {
 
 881         // use a data source that throws an exception when getConnection() is called
 
 882         InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
 
 885         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 887         // invoke doLock - should fail and reschedule
 
 890         // should still be waiting
 
 891         assertTrue(lock.isWaiting());
 
 892         verify(callback, never()).lockUnavailable(lock);
 
 894         // free the lock while doLock is executing
 
 895         invfeat.freeLock = true;
 
 897         // try scheduled request - should just invoke doUnlock
 
 900         // should still be waiting
 
 901         assertTrue(lock.isUnavailable());
 
 902         verify(callback, never()).lockUnavailable(lock);
 
 904         // should have scheduled a retry of doUnlock
 
 905         verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
 
 909     public void testDistributedLockGetNextRequest() {
 
 910         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 913          * run doLock. This should cause getNextRequest() to be called twice, once with a
 
 914          * request in the queue, and the second time with request=null.
 
 920      * Tests getNextRequest(), where the same request is still in the queue the second
 
 924     public void testDistributedLockGetNextRequestSameRequest() {
 
 925         // force reschedule to be invoked
 
 926         feature = new InvalidDbLockingFeature(TRANSIENT);
 
 928         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 931          * run doLock. This should cause getNextRequest() to be called twice, once with a
 
 932          * request in the queue, and the second time with the same request again.
 
 936         verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
 
 940     public void testDistributedLockDoRequest() throws SQLException {
 
 941         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
 943         assertTrue(lock.isWaiting());
 
 945         // run doLock via doRequest
 
 948         assertTrue(lock.isActive());
 
 952      * Tests doRequest(), when doRequest() is already running within another thread.
 
 955     public void testDistributedLockDoRequestBusy() {
 
 957          * this feature will invoke a request in a background thread while it's being run
 
 958          * in a foreground thread.
 
 960         AtomicBoolean running = new AtomicBoolean(false);
 
 961         AtomicBoolean returned = new AtomicBoolean(false);
 
 963         feature = new MyLockingFeature(true) {
 
 965             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
 
 966                             LockCallback callback) {
 
 967                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
 
 968                     private static final long serialVersionUID = 1L;
 
 971                     protected boolean doDbInsert(Connection conn) throws SQLException {
 
 973                             // already inside the thread - don't recurse any further
 
 974                             return super.doDbInsert(conn);
 
 979                         Thread thread = new Thread(() -> {
 
 980                             // run doLock from within the new thread
 
 983                         thread.setDaemon(true);
 
 986                         // wait for the background thread to complete before continuing
 
 989                         } catch (InterruptedException ignore) {
 
 990                             Thread.currentThread().interrupt();
 
 993                         returned.set(!thread.isAlive());
 
 995                         return super.doDbInsert(conn);
 
1001         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1006         assertTrue(returned.get());
 
1010      * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
 
1012      * @throws SQLException if an error occurs
 
1015     public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
 
1016         // throw run-time exception
 
1017         when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
 
1019         // use a data source that throws an exception when getConnection() is called
 
1020         feature = new MyLockingFeature(true) {
 
1022             protected BasicDataSource makeDataSource() throws Exception {
 
1027         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1029         // invoke doLock - should NOT reschedule
 
1032         assertTrue(lock.isUnavailable());
 
1033         verify(callback).lockUnavailable(lock);
 
1035         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
 
1039      * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
 
1042      * @throws SQLException if an error occurs
 
1045     public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
 
1046         // throw run-time exception
 
1047         when(datasrc.getConnection()).thenAnswer(answer -> {
 
1049             throw new IllegalStateException(EXPECTED_EXCEPTION);
 
1052         // use a data source that throws an exception when getConnection() is called
 
1053         feature = new MyLockingFeature(true) {
 
1055             protected BasicDataSource makeDataSource() throws Exception {
 
1060         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1062         // invoke doLock - should NOT reschedule
 
1065         assertTrue(lock.isUnavailable());
 
1066         verify(callback, never()).lockUnavailable(lock);
 
1068         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
 
1072      * Tests doRequest() when the retry count gets exhausted.
 
1075     public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
 
1076         // use a data source that throws an exception when getConnection() is called
 
1077         feature = new InvalidDbLockingFeature(TRANSIENT);
 
1079         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1081         // invoke doLock - should fail and reschedule
 
1084         // should still be waiting
 
1085         assertTrue(lock.isWaiting());
 
1086         verify(callback, never()).lockUnavailable(lock);
 
1088         // try again, via SCHEDULER - first retry fails
 
1091         // should still be waiting
 
1092         assertTrue(lock.isWaiting());
 
1093         verify(callback, never()).lockUnavailable(lock);
 
1095         // try again, via SCHEDULER - final retry fails
 
1097         assertTrue(lock.isUnavailable());
 
1099         // now callback should have been called
 
1100         verify(callback).lockUnavailable(lock);
 
1104      * Tests doRequest() when a non-transient DB exception is thrown.
 
1107     public void testDistributedLockDoRequestNotTransient() {
 
1109          * use a data source that throws a PERMANENT exception when getConnection() is
 
1112         feature = new InvalidDbLockingFeature(PERMANENT);
 
1114         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1116         // invoke doLock - should fail
 
1119         assertTrue(lock.isUnavailable());
 
1120         verify(callback).lockUnavailable(lock);
 
1122         // should not have scheduled anything new
 
1123         verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
 
1124         verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
 
1128     public void testDistributedLockDoLock() throws SQLException {
 
1129         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1131         // invoke doLock - should simply do an insert
 
1132         long tbegin = System.currentTimeMillis();
 
1135         assertEquals(1, getRecordCount());
 
1136         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
 
1137         verify(callback).lockAvailable(lock);
 
1141      * Tests doLock() when the lock is freed before doLock runs.
 
1143      * @throws SQLException if an error occurs
 
1146     public void testDistributedLockDoLockFreed() throws SQLException {
 
1147         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1149         lock.setState(LockState.UNAVAILABLE);
 
1151         // invoke doLock - should do nothing
 
1154         assertEquals(0, getRecordCount());
 
1156         verify(callback, never()).lockAvailable(lock);
 
1160      * Tests doLock() when a DB exception is thrown.
 
1163     public void testDistributedLockDoLockEx() {
 
1164         // use a data source that throws an exception when getConnection() is called
 
1165         feature = new InvalidDbLockingFeature(PERMANENT);
 
1167         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1169         // invoke doLock - should simply do an insert
 
1172         // lock should have failed due to exception
 
1173         verify(callback).lockUnavailable(lock);
 
1177      * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
 
1181     public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
 
1182         // insert an expired record
 
1183         insertRecord(RESOURCE, feature.getUuidString(), 0);
 
1185         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1187         // invoke doLock - should simply do an update
 
1189         verify(callback).lockAvailable(lock);
 
1193      * Tests doLock() when a locked record already exists.
 
1196     public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
 
1197         // insert an expired record
 
1198         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
 
1200         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1205         // lock should have failed because it's already locked
 
1206         verify(callback).lockUnavailable(lock);
 
1210     public void testDistributedLockDoUnlock() throws SQLException {
 
1211         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1218         // invoke doUnlock()
 
1219         long tbegin = System.currentTimeMillis();
 
1222         assertEquals(0, getRecordCount());
 
1223         assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
 
1225         assertTrue(lock.isUnavailable());
 
1227         // no more callbacks should have occurred
 
1228         verify(callback, times(1)).lockAvailable(lock);
 
1229         verify(callback, never()).lockUnavailable(lock);
 
1233      * Tests doUnlock() when a DB exception is thrown.
 
1235      * @throws SQLException if an error occurs
 
1238     public void testDistributedLockDoUnlockEx() throws SQLException {
 
1239         feature = new InvalidDbLockingFeature(PERMANENT);
 
1241         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1243         // do NOT invoke doLock() - it will fail without a DB connection
 
1247         // invoke doUnlock()
 
1250         assertTrue(lock.isUnavailable());
 
1252         // no more callbacks should have occurred
 
1253         verify(callback, never()).lockAvailable(lock);
 
1254         verify(callback, never()).lockUnavailable(lock);
 
1258     public void testDistributedLockDoExtend() throws SQLException {
 
1259         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1262         LockCallback callback2 = mock(LockCallback.class);
 
1263         lock.extend(HOLD_SEC2, callback2);
 
1266         long tbegin = System.currentTimeMillis();
 
1269         assertEquals(1, getRecordCount());
 
1270         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
 
1272         assertTrue(lock.isActive());
 
1274         // no more callbacks should have occurred
 
1275         verify(callback).lockAvailable(lock);
 
1276         verify(callback, never()).lockUnavailable(lock);
 
1278         // extension should have succeeded
 
1279         verify(callback2).lockAvailable(lock);
 
1280         verify(callback2, never()).lockUnavailable(lock);
 
1284      * Tests doExtend() when the lock is freed before doExtend runs.
 
1286      * @throws SQLException if an error occurs
 
1289     public void testDistributedLockDoExtendFreed() throws SQLException {
 
1290         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1291         lock.extend(HOLD_SEC2, callback);
 
1293         lock.setState(LockState.UNAVAILABLE);
 
1295         // invoke doExtend - should do nothing
 
1298         assertEquals(0, getRecordCount());
 
1300         verify(callback, never()).lockAvailable(lock);
 
1304      * Tests doExtend() when the lock record is missing from the DB, thus requiring an
 
1307      * @throws SQLException if an error occurs
 
1310     public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
 
1311         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1314         LockCallback callback2 = mock(LockCallback.class);
 
1315         lock.extend(HOLD_SEC2, callback2);
 
1317         // delete the record so it's forced to re-insert it
 
1321         long tbegin = System.currentTimeMillis();
 
1324         assertEquals(1, getRecordCount());
 
1325         assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
 
1327         assertTrue(lock.isActive());
 
1329         // no more callbacks should have occurred
 
1330         verify(callback).lockAvailable(lock);
 
1331         verify(callback, never()).lockUnavailable(lock);
 
1333         // extension should have succeeded
 
1334         verify(callback2).lockAvailable(lock);
 
1335         verify(callback2, never()).lockUnavailable(lock);
 
1339      * Tests doExtend() when both update and insert fail.
 
1341      * @throws SQLException if an error occurs
 
1344     public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
 
1346          * this feature will create a lock that returns false when doDbUpdate() is
 
1347          * invoked, or when doDbInsert() is invoked a second time
 
1349         feature = new MyLockingFeature(true) {
 
1351             protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
 
1352                             LockCallback callback) {
 
1353                 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
 
1354                     private static final long serialVersionUID = 1L;
 
1355                     private int ntimes = 0;
 
1358                     protected boolean doDbInsert(Connection conn) throws SQLException {
 
1363                         return super.doDbInsert(conn);
 
1367                     protected boolean doDbUpdate(Connection conn) {
 
1374         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1377         LockCallback callback2 = mock(LockCallback.class);
 
1378         lock.extend(HOLD_SEC2, callback2);
 
1383         assertTrue(lock.isUnavailable());
 
1385         // no more callbacks should have occurred
 
1386         verify(callback).lockAvailable(lock);
 
1387         verify(callback, never()).lockUnavailable(lock);
 
1389         // extension should have failed
 
1390         verify(callback2, never()).lockAvailable(lock);
 
1391         verify(callback2).lockUnavailable(lock);
 
1395      * Tests doExtend() when an exception occurs.
 
1397      * @throws SQLException if an error occurs
 
1400     public void testDistributedLockDoExtendEx() throws SQLException {
 
1401         lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
 
1405          * delete the record and insert one with a different owner, which will cause
 
1406          * doDbInsert() to throw an exception
 
1409         insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
 
1411         LockCallback callback2 = mock(LockCallback.class);
 
1412         lock.extend(HOLD_SEC2, callback2);
 
1417         assertTrue(lock.isUnavailable());
 
1419         // no more callbacks should have occurred
 
1420         verify(callback).lockAvailable(lock);
 
1421         verify(callback, never()).lockUnavailable(lock);
 
1423         // extension should have failed
 
1424         verify(callback2, never()).lockAvailable(lock);
 
1425         verify(callback2).lockUnavailable(lock);
 
1429     public void testDistributedLockToString() {
 
1430         String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
 
1431         assertNotNull(text);
 
1432         assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
 
1436     public void testMakeThreadPool() {
 
1437         // use a REAL feature to test this
 
1438         feature = new DistributedLockManager();
 
1440         // this should create a thread pool
 
1441         feature.beforeCreateLockManager(engine, new Properties());
 
1442         feature.afterStart(engine);
 
1444         assertThatCode(() -> shutdownFeature()).doesNotThrowAnyException();
 
1448      * Performs a multi-threaded test of the locking facility.
 
1450      * @throws InterruptedException if the current thread is interrupted while waiting for
 
1451      *         the background threads to complete
 
1454     public void testMultiThreaded() throws InterruptedException {
 
1455         Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
 
1457         feature = new DistributedLockManager();
 
1458         feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
 
1459         feature.afterStart(PolicyEngineConstants.getManager());
 
1461         List<MyThread> threads = new ArrayList<>(MAX_THREADS);
 
1462         for (int x = 0; x < MAX_THREADS; ++x) {
 
1463             threads.add(new MyThread());
 
1466         threads.forEach(Thread::start);
 
1468         for (MyThread thread : threads) {
 
1470             assertFalse(thread.isAlive());
 
1473         for (MyThread thread : threads) {
 
1474             if (thread.err != null) {
 
1479         assertTrue(nsuccesses.get() > 0);
 
1482     private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
 
1483                     boolean waitForLock) {
 
1484         return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
 
1487     private DistributedLock roundTrip(DistributedLock lock) throws Exception {
 
1488         ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
1489         try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
 
1490             oos.writeObject(lock);
 
1493         ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
 
1494         try (ObjectInputStream ois = new ObjectInputStream(bais)) {
 
1495             return (DistributedLock) ois.readObject();
 
1500      * Runs the checkExpired() action.
 
1502      * @param nskip number of actions in the work queue to skip
 
1503      * @param nadditional number of additional actions that appear in the work queue
 
1504      *        <i>after</i> the checkExpired action
 
1505      * @param schedSec number of seconds for which the checker should have been scheduled
 
1507     private void runChecker(int nskip, int nadditional, long schedSec) {
 
1508         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
 
1509         verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
 
1510         Runnable action = captor.getAllValues().get(nskip);
 
1515      * Runs a lock action (e.g., doLock, doUnlock).
 
1517      * @param nskip number of actions in the work queue to skip
 
1518      * @param nadditional number of additional actions that appear in the work queue
 
1519      *        <i>after</i> the desired action
 
1521     void runLock(int nskip, int nadditional) {
 
1522         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
 
1523         verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
 
1525         Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
 
1530      * Runs a scheduled action (e.g., "retry" action).
 
1532      * @param nskip number of actions in the work queue to skip
 
1533      * @param nadditional number of additional actions that appear in the work queue
 
1534      *        <i>after</i> the desired action
 
1536     void runSchedule(int nskip, int nadditional) {
 
1537         ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
 
1538         verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
 
1540         Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
 
1545      * Gets a count of the number of lock records in the DB.
 
1547      * @return the number of lock records in the DB
 
1548      * @throws SQLException if an error occurs accessing the DB
 
1550     private int getRecordCount() throws SQLException {
 
1551         try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
 
1552                         ResultSet result = stmt.executeQuery()) {
 
1554             if (result.next()) {
 
1555                 return result.getInt(1);
 
1564      * Determines if there is a record for the given resource whose expiration time is in
 
1565      * the expected range.
 
1567      * @param resourceId ID of the resource of interest
 
1568      * @param uuidString UUID string of the owner
 
1569      * @param holdSec seconds for which the lock was to be held
 
1570      * @param tbegin earliest time, in milliseconds, at which the record could have been
 
1571      *        inserted into the DB
 
1572      * @return {@code true} if a record is found, {@code false} otherwise
 
1573      * @throws SQLException if an error occurs accessing the DB
 
1575     private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
 
1576         try (PreparedStatement stmt =
 
1577                         conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
 
1578                                         + " WHERE resourceId=? AND host=? AND owner=?")) {
 
1580             stmt.setString(1, resourceId);
 
1581             stmt.setString(2, feature.getHostName());
 
1582             stmt.setString(3, uuidString);
 
1584             try (ResultSet result = stmt.executeQuery()) {
 
1585                 if (result.next()) {
 
1586                     int remaining = result.getInt(1);
 
1587                     long maxDiff = System.currentTimeMillis() - tbegin;
 
1588                     return (remaining >= 0 && holdSec - remaining <= maxDiff);
 
1598      * Inserts a record into the DB.
 
1600      * @param resourceId ID of the resource of interest
 
1601      * @param uuidString UUID string of the owner
 
1602      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
 
1603      * @throws SQLException if an error occurs accessing the DB
 
1605     private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
 
1606         this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset);
 
1609     private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
 
1610                     throws SQLException {
 
1611         try (PreparedStatement stmt =
 
1612                         conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
 
1613                                         + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
 
1615             stmt.setString(1, resourceId);
 
1616             stmt.setString(2, hostName);
 
1617             stmt.setString(3, uuidString);
 
1618             stmt.setInt(4, expireOffset);
 
1620             assertEquals(1, stmt.executeUpdate());
 
1625      * Updates a record in the DB.
 
1627      * @param resourceId ID of the resource of interest
 
1628      * @param newUuid UUID string of the <i>new</i> owner
 
1629      * @param expireOffset offset, in seconds, from "now", at which the lock should expire
 
1630      * @throws SQLException if an error occurs accessing the DB
 
1632     private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
 
1633         try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
 
1634                         + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
 
1636             stmt.setString(1, newHost);
 
1637             stmt.setString(2, newUuid);
 
1638             stmt.setInt(3, expireOffset);
 
1639             stmt.setString(4, resourceId);
 
1641             assertEquals(1, stmt.executeUpdate());
 
1646      * Feature that uses <i>exsvc</i> to execute requests.
 
1648     private class MyLockingFeature extends DistributedLockManager {
 
1650         public MyLockingFeature(boolean init) {
 
1653             exsvc = mock(ScheduledExecutorService.class);
 
1654             when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
 
1655             Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
 
1658                 beforeCreateLockManager(engine, new Properties());
 
1666      * Feature whose data source all throws exceptions.
 
1668     private class InvalidDbLockingFeature extends MyLockingFeature {
 
1669         private boolean isTransient;
 
1670         private boolean freeLock = false;
 
1672         public InvalidDbLockingFeature(boolean isTransient) {
 
1673             // pass "false" because we have to set the error code BEFORE calling
 
1677             this.isTransient = isTransient;
 
1679             this.beforeCreateLockManager(engine, new Properties());
 
1681             this.afterStart(engine);
 
1685         protected BasicDataSource makeDataSource() throws Exception {
 
1686             when(datasrc.getConnection()).thenAnswer(answer -> {
 
1695             doThrow(makeEx()).when(datasrc).close();
 
1700         protected SQLException makeEx() {
 
1702                 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
 
1705                 return new SQLException(EXPECTED_EXCEPTION);
 
1711      * Feature whose locks free themselves while free() is already running.
 
1713     private class FreeWithFreeLockingFeature extends MyLockingFeature {
 
1714         private boolean relock;
 
1716         public FreeWithFreeLockingFeature(boolean relock) {
 
1718             this.relock = relock;
 
1722         protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
 
1723                         LockCallback callback) {
 
1725             return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
 
1726                 private static final long serialVersionUID = 1L;
 
1727                 private boolean checked = false;
 
1730                 public boolean isUnavailable() {
 
1732                         return super.isUnavailable();
 
1737                     // release and relock
 
1745                         createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
 
1755      * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
 
1756      * extend it, and then unlock it.
 
1758     private class MyThread extends Thread {
 
1759         AssertionError err = null;
 
1768                 for (int x = 0; x < MAX_LOOPS; ++x) {
 
1772             } catch (AssertionError e) {
 
1777         private void makeAttempt() {
 
1779                 Semaphore sem = new Semaphore(0);
 
1781                 LockCallback cb = new LockCallback() {
 
1783                     public void lockAvailable(Lock lock) {
 
1788                     public void lockUnavailable(Lock lock) {
 
1793                 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
 
1795                 // wait for callback, whether available or unavailable
 
1796                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
 
1797                 if (!lock.isActive()) {
 
1801                 nsuccesses.incrementAndGet();
 
1803                 assertEquals(1, nactive.incrementAndGet());
 
1805                 lock.extend(HOLD_SEC2, cb);
 
1806                 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
 
1807                 assertTrue(lock.isActive());
 
1809                 // decrement BEFORE free()
 
1810                 nactive.decrementAndGet();
 
1812                 assertTrue(lock.free());
 
1813                 assertTrue(lock.isUnavailable());
 
1815             } catch (InterruptedException e) {
 
1816                 Thread.currentThread().interrupt();
 
1817                 throw new AssertionError("interrupted", e);