2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2022 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.distributed.locking;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
26 import static org.assertj.core.api.Assertions.assertThatThrownBy;
27 import static org.junit.Assert.assertEquals;
28 import static org.junit.Assert.assertFalse;
29 import static org.junit.Assert.assertNotNull;
30 import static org.junit.Assert.assertNotSame;
31 import static org.junit.Assert.assertNull;
32 import static org.junit.Assert.assertSame;
33 import static org.junit.Assert.assertTrue;
34 import static org.mockito.ArgumentMatchers.any;
35 import static org.mockito.ArgumentMatchers.anyBoolean;
36 import static org.mockito.ArgumentMatchers.anyLong;
37 import static org.mockito.ArgumentMatchers.eq;
38 import static org.mockito.Mockito.doThrow;
39 import static org.mockito.Mockito.mock;
40 import static org.mockito.Mockito.never;
41 import static org.mockito.Mockito.times;
42 import static org.mockito.Mockito.verify;
43 import static org.mockito.Mockito.when;
45 import java.io.ByteArrayInputStream;
46 import java.io.ByteArrayOutputStream;
47 import java.io.ObjectInputStream;
48 import java.io.ObjectOutputStream;
49 import java.sql.Connection;
50 import java.sql.DriverManager;
51 import java.sql.PreparedStatement;
52 import java.sql.ResultSet;
53 import java.sql.SQLException;
54 import java.sql.SQLTransientException;
55 import java.util.ArrayList;
56 import java.util.List;
57 import java.util.Properties;
58 import java.util.concurrent.Executors;
59 import java.util.concurrent.RejectedExecutionException;
60 import java.util.concurrent.ScheduledExecutorService;
61 import java.util.concurrent.ScheduledFuture;
62 import java.util.concurrent.Semaphore;
63 import java.util.concurrent.TimeUnit;
64 import java.util.concurrent.atomic.AtomicBoolean;
65 import java.util.concurrent.atomic.AtomicInteger;
66 import java.util.concurrent.atomic.AtomicReference;
67 import org.apache.commons.dbcp2.BasicDataSource;
68 import org.junit.After;
69 import org.junit.AfterClass;
70 import org.junit.Before;
71 import org.junit.BeforeClass;
72 import org.junit.Test;
73 import org.junit.runner.RunWith;
74 import org.kie.api.runtime.KieSession;
75 import org.mockito.ArgumentCaptor;
76 import org.mockito.Mock;
77 import org.mockito.junit.MockitoJUnitRunner;
78 import org.onap.policy.common.utils.services.OrderedServiceImpl;
79 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
80 import org.onap.policy.drools.core.PolicySession;
81 import org.onap.policy.drools.core.lock.Lock;
82 import org.onap.policy.drools.core.lock.LockCallback;
83 import org.onap.policy.drools.core.lock.LockState;
84 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
85 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
86 import org.onap.policy.drools.system.PolicyEngine;
87 import org.onap.policy.drools.system.PolicyEngineConstants;
88 import org.powermock.reflect.Whitebox;
90 @RunWith(MockitoJUnitRunner.class)
91 public class DistributedLockManagerTest {
92 private static final long EXPIRE_SEC = 900L;
93 private static final long RETRY_SEC = 60L;
94 private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
95 private static final String OTHER_HOST = "other-host";
96 private static final String OTHER_OWNER = "other-owner";
97 private static final String EXPECTED_EXCEPTION = "expected exception";
98 private static final String DB_CONNECTION =
99 "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
100 private static final String DB_USER = "user";
101 private static final String DB_PASSWORD = "password";
102 private static final String OWNER_KEY = "my key";
103 private static final String RESOURCE = "my resource";
104 private static final String RESOURCE2 = "my resource #2";
105 private static final String RESOURCE3 = "my resource #3";
106 private static final String RESOURCE4 = "my resource #4";
107 private static final String RESOURCE5 = "my resource #5";
108 private static final int HOLD_SEC = 100;
109 private static final int HOLD_SEC2 = 120;
110 private static final int MAX_THREADS = 5;
111 private static final int MAX_LOOPS = 100;
112 private static final boolean TRANSIENT = true;
113 private static final boolean PERMANENT = false;
115 // number of execute() calls before the first lock attempt
116 private static final int PRE_LOCK_EXECS = 1;
118 // number of execute() calls before the first schedule attempt
119 private static final int PRE_SCHED_EXECS = 1;
121 private static Connection conn = null;
122 private static ScheduledExecutorService saveExec;
123 private static ScheduledExecutorService realExec;
126 private PolicyEngine engine;
129 private KieSession kieSess;
132 private ScheduledExecutorService exsvc;
135 private ScheduledFuture<?> checker;
138 private LockCallback callback;
141 private BasicDataSource datasrc;
143 private DistributedLock lock;
144 private PolicySession session;
146 private AtomicInteger nactive;
147 private AtomicInteger nsuccesses;
148 private DistributedLockManager feature;
152 * Configures the location of the property files and creates the DB.
154 * @throws SQLException if the DB cannot be created
157 public static void setUpBeforeClass() throws SQLException {
158 SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
159 PolicyEngineConstants.getManager().configure(new Properties());
161 conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
163 try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
164 + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
165 + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
166 createStmt.executeUpdate();
169 saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
171 realExec = Executors.newScheduledThreadPool(3);
175 * Restores static fields.
178 public static void tearDownAfterClass() throws SQLException {
179 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
185 * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
188 * @throws SQLException if the lock records cannot be deleted from the DB
191 public void setUp() throws SQLException {
192 // grant() and deny() calls will come through here and be immediately executed
193 session = new PolicySession(null, null, kieSess) {
195 public void insertDrools(Object object) {
196 ((Runnable) object).run();
200 session.setPolicySession();
202 nactive = new AtomicInteger(0);
203 nsuccesses = new AtomicInteger(0);
207 feature = new MyLockingFeature(true);
211 public void tearDown() throws SQLException {
216 private void cleanDb() throws SQLException {
217 try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
218 stmt.executeUpdate();
222 private void shutdownFeature() {
223 if (feature != null) {
224 feature.afterStop(engine);
230 * Tests that the feature is found in the expected service sets.
233 public void testServiceApis() {
234 assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
235 .anyMatch(obj -> obj instanceof DistributedLockManager));
239 public void testGetSequenceNumber() {
240 assertEquals(1000, feature.getSequenceNumber());
244 public void testBeforeCreateLockManager() {
245 assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
249 * Tests beforeCreate(), when getProperties() throws a runtime exception.
252 public void testBeforeCreateLockManagerEx() {
255 feature = new MyLockingFeature(false) {
257 protected Properties getProperties(String fileName) {
258 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
262 Properties props = new Properties();
263 assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, props))
264 .isInstanceOf(DistributedLockManagerException.class);
268 public void testAfterStart() {
269 // verify that cleanup & expire check are both added to the queue
270 verify(exsvc).execute(any());
271 verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
275 * Tests afterStart(), when thread pool throws a runtime exception.
278 public void testAfterStartExInThreadPool() {
281 feature = new MyLockingFeature(false);
283 doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
285 assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
289 public void testDeleteExpiredDbLocks() throws SQLException {
290 // add records: two expired, one not
291 insertRecord(RESOURCE, feature.getUuidString(), -1);
292 insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
293 insertRecord(RESOURCE3, OTHER_OWNER, 0);
294 insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
296 // get the clean-up function and execute it
297 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
298 verify(exsvc).execute(captor.capture());
300 long tbegin = System.currentTimeMillis();
301 Runnable action = captor.getValue();
304 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
305 assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
306 assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
307 assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
309 assertEquals(2, getRecordCount());
313 * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
317 public void testDeleteExpiredDbLocksEx() {
318 feature = new InvalidDbLockingFeature(TRANSIENT);
320 // get the clean-up function and execute it
321 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
322 verify(exsvc).execute(captor.capture());
324 Runnable action = captor.getValue();
326 // should not throw an exception
331 public void testAfterStop() {
333 verify(checker).cancel(anyBoolean());
335 feature = new DistributedLockManager();
337 // shutdown without calling afterStart()
343 * Tests afterStop(), when the data source throws an exception when close() is called.
347 public void testAfterStopEx() {
350 // use a data source that throws an exception when closed
351 feature = new InvalidDbLockingFeature(TRANSIENT);
353 assertThatCode(this::shutdownFeature).doesNotThrowAnyException();
357 public void testCreateLock() throws SQLException {
358 verify(exsvc).execute(any());
360 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
361 assertTrue(lock.isWaiting());
363 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
365 // this lock should fail
366 LockCallback callback2 = mock(LockCallback.class);
367 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
368 assertTrue(lock2.isUnavailable());
369 verify(callback2, never()).lockAvailable(lock2);
370 verify(callback2).lockUnavailable(lock2);
372 // this should fail, too
373 LockCallback callback3 = mock(LockCallback.class);
374 DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
375 assertTrue(lock3.isUnavailable());
376 verify(callback3, never()).lockAvailable(lock3);
377 verify(callback3).lockUnavailable(lock3);
379 // no change to first
380 assertTrue(lock.isWaiting());
382 // no callbacks to the first lock
383 verify(callback, never()).lockAvailable(lock);
384 verify(callback, never()).lockUnavailable(lock);
386 assertTrue(lock.isWaiting());
387 assertEquals(0, getRecordCount());
390 assertTrue(lock.isActive());
391 assertEquals(1, getRecordCount());
393 verify(callback).lockAvailable(lock);
394 verify(callback, never()).lockUnavailable(lock);
396 // this should succeed
397 DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
398 assertTrue(lock4.isWaiting());
400 // after running checker, original records should still remain
401 runChecker(0, 0, EXPIRE_SEC);
402 assertEquals(1, getRecordCount());
403 verify(callback, never()).lockUnavailable(lock);
407 * Tests createLock() when the feature is not the latest instance.
410 public void testCreateLockNotLatestInstance() {
411 DistributedLockManager.setLatestInstance(null);
413 Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
414 assertTrue(lock.isUnavailable());
415 verify(callback, never()).lockAvailable(any());
416 verify(callback).lockUnavailable(lock);
420 public void testCheckExpired() throws SQLException {
421 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
424 LockCallback callback2 = mock(LockCallback.class);
425 final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
428 LockCallback callback3 = mock(LockCallback.class);
429 final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
432 LockCallback callback4 = mock(LockCallback.class);
433 final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
436 LockCallback callback5 = mock(LockCallback.class);
437 final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
440 assertEquals(5, getRecordCount());
443 updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
445 // change host of another record
446 updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
448 // change uuid of another record
449 updateRecord(RESOURCE5, feature.getPdpName(), OTHER_OWNER, HOLD_SEC);
453 runChecker(0, 0, EXPIRE_SEC);
457 assertTrue(lock.isUnavailable());
458 assertTrue(lock2.isActive());
459 assertTrue(lock3.isUnavailable());
460 assertTrue(lock4.isActive());
461 assertTrue(lock5.isUnavailable());
467 verify(callback).lockUnavailable(lock);
468 verify(callback3).lockUnavailable(lock3);
469 verify(callback5).lockUnavailable(lock5);
471 verify(callback2, never()).lockUnavailable(lock2);
472 verify(callback4, never()).lockUnavailable(lock4);
475 // another check should have been scheduled, with the normal interval
476 runChecker(1, 0, EXPIRE_SEC);
480 * Tests checkExpired(), when schedule() throws an exception.
483 public void testCheckExpiredExecRejected() {
484 // arrange for execution to be rejected
485 when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
486 .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
488 runChecker(0, 0, EXPIRE_SEC);
492 * Tests checkExpired(), when getConnection() throws an exception.
495 public void testCheckExpiredSqlEx() {
496 // use a data source that throws an exception when getConnection() is called
497 feature = new InvalidDbLockingFeature(TRANSIENT);
499 runChecker(0, 0, EXPIRE_SEC);
501 // it should have scheduled another check, sooner
502 runChecker(0, 0, RETRY_SEC);
506 * Tests checkExpired(), when getConnection() throws an exception and the feature is
510 public void testCheckExpiredSqlExFeatureStopped() {
511 // use a data source that throws an exception when getConnection() is called
512 feature = new InvalidDbLockingFeature(TRANSIENT) {
514 protected SQLException makeEx() {
516 return super.makeEx();
520 runChecker(0, 0, EXPIRE_SEC);
522 // it should NOT have scheduled another check
523 verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
527 public void testExpireLocks() throws SQLException {
528 AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
530 feature = new MyLockingFeature(true) {
532 protected BasicDataSource makeDataSource() throws Exception {
533 // get the real data source
534 BasicDataSource src2 = super.makeDataSource();
536 when(datasrc.getConnection()).thenAnswer(answer -> {
537 DistributedLock lck = freeLock.getAndSet(null);
546 return src2.getConnection();
553 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
556 LockCallback callback2 = mock(LockCallback.class);
557 final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
560 LockCallback callback3 = mock(LockCallback.class);
561 final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
562 // don't run doLock for lock3 - leave it in the waiting state
564 LockCallback callback4 = mock(LockCallback.class);
565 final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
568 assertEquals(3, getRecordCount());
571 updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
573 // arrange to free lock4 while the checker is running
577 runChecker(0, 0, EXPIRE_SEC);
581 assertTrue(lock.isUnavailable());
582 assertTrue(lock2.isActive());
583 assertTrue(lock3.isWaiting());
584 assertTrue(lock4.isUnavailable());
587 verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
589 verify(callback).lockUnavailable(lock);
590 verify(callback2, never()).lockUnavailable(lock2);
591 verify(callback3, never()).lockUnavailable(lock3);
592 verify(callback4, never()).lockUnavailable(lock4);
596 public void testDistributedLockNoArgs() {
597 DistributedLock lock = new DistributedLock();
598 assertNull(lock.getResourceId());
599 assertNull(lock.getOwnerKey());
600 assertNull(lock.getCallback());
601 assertEquals(0, lock.getHoldSec());
605 public void testDistributedLock() {
606 assertThatIllegalArgumentException()
607 .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
608 .withMessageContaining("holdSec");
610 // should generate no exception
611 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
615 public void testDistributedLockSerializable() throws Exception {
616 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
617 lock = roundTrip(lock);
619 assertTrue(lock.isWaiting());
621 assertEquals(RESOURCE, lock.getResourceId());
622 assertEquals(OWNER_KEY, lock.getOwnerKey());
623 assertNull(lock.getCallback());
624 assertEquals(HOLD_SEC, lock.getHoldSec());
628 public void testGrant() {
629 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
630 assertFalse(lock.isActive());
632 // execute the doLock() call
635 assertTrue(lock.isActive());
637 // the callback for the lock should have been run in the foreground thread
638 verify(callback).lockAvailable(lock);
642 public void testDistributedLockDeny() {
644 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
646 // get another lock - should fail
647 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
649 assertTrue(lock.isUnavailable());
651 // the callback for the second lock should have been run in the foreground thread
652 verify(callback).lockUnavailable(lock);
654 // should only have a request for the first lock
655 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
659 public void testDistributedLockFree() {
660 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
662 assertTrue(lock.free());
663 assertTrue(lock.isUnavailable());
665 // run both requests associated with the lock
669 // should not have changed state
670 assertTrue(lock.isUnavailable());
672 // attempt to free it again
673 assertFalse(lock.free());
675 // should not have queued anything else
676 verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
678 // new lock should succeed
679 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
680 assertNotSame(lock2, lock);
681 assertTrue(lock2.isWaiting());
685 * Tests that free() works on a serialized lock with a new feature.
687 * @throws Exception if an error occurs
690 public void testDistributedLockFreeSerialized() throws Exception {
691 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
693 feature = new MyLockingFeature(true);
695 lock = roundTrip(lock);
696 assertTrue(lock.free());
697 assertTrue(lock.isUnavailable());
701 * Tests free() on a serialized lock without a feature.
703 * @throws Exception if an error occurs
706 public void testDistributedLockFreeNoFeature() throws Exception {
707 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
709 DistributedLockManager.setLatestInstance(null);
711 lock = roundTrip(lock);
712 assertFalse(lock.free());
713 assertTrue(lock.isUnavailable());
717 * Tests the case where the lock is freed and doUnlock called between the call to
718 * isUnavailable() and the call to compute().
721 public void testDistributedLockFreeUnlocked() {
722 feature = new FreeWithFreeLockingFeature(true);
724 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
726 assertFalse(lock.free());
727 assertTrue(lock.isUnavailable());
731 * Tests the case where the lock is freed, but doUnlock is not completed, between the
732 * call to isUnavailable() and the call to compute().
735 public void testDistributedLockFreeLockFreed() {
736 feature = new FreeWithFreeLockingFeature(false);
738 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
740 assertFalse(lock.free());
741 assertTrue(lock.isUnavailable());
745 public void testDistributedLockExtend() {
746 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
748 // lock2 should be denied - called back by this thread
749 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
750 verify(callback, never()).lockAvailable(lock2);
751 verify(callback).lockUnavailable(lock2);
753 // lock2 will still be denied - called back by this thread
754 lock2.extend(HOLD_SEC, callback);
755 verify(callback, times(2)).lockUnavailable(lock2);
757 // force lock2 to be active - should still be denied
758 Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
759 lock2.extend(HOLD_SEC, callback);
760 verify(callback, times(3)).lockUnavailable(lock2);
762 assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
763 .withMessageContaining("holdSec");
767 assertTrue(lock.isActive());
769 // now extend the first lock
770 LockCallback callback2 = mock(LockCallback.class);
771 lock.extend(HOLD_SEC2, callback2);
772 assertTrue(lock.isWaiting());
774 // execute doExtend()
776 lock.extend(HOLD_SEC2, callback2);
777 assertEquals(HOLD_SEC2, lock.getHoldSec());
778 verify(callback2).lockAvailable(lock);
779 verify(callback2, never()).lockUnavailable(lock);
783 * Tests that extend() works on a serialized lock with a new feature.
785 * @throws Exception if an error occurs
788 public void testDistributedLockExtendSerialized() throws Exception {
789 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
793 assertTrue(lock.isActive());
795 feature = new MyLockingFeature(true);
797 lock = roundTrip(lock);
798 assertTrue(lock.isActive());
800 LockCallback scallback = mock(LockCallback.class);
802 lock.extend(HOLD_SEC, scallback);
803 assertTrue(lock.isWaiting());
805 // run doExtend (in new feature)
807 assertTrue(lock.isActive());
809 verify(scallback).lockAvailable(lock);
810 verify(scallback, never()).lockUnavailable(lock);
814 * Tests extend() on a serialized lock without a feature.
816 * @throws Exception if an error occurs
819 public void testDistributedLockExtendNoFeature() throws Exception {
820 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
824 assertTrue(lock.isActive());
826 DistributedLockManager.setLatestInstance(null);
828 lock = roundTrip(lock);
829 assertTrue(lock.isActive());
831 LockCallback scallback = mock(LockCallback.class);
833 lock.extend(HOLD_SEC, scallback);
834 assertTrue(lock.isUnavailable());
836 verify(scallback, never()).lockAvailable(lock);
837 verify(scallback).lockUnavailable(lock);
841 * Tests the case where the lock is freed and doUnlock called between the call to
842 * isUnavailable() and the call to compute().
845 public void testDistributedLockExtendUnlocked() {
846 feature = new FreeWithFreeLockingFeature(true);
848 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
850 lock.extend(HOLD_SEC2, callback);
851 assertTrue(lock.isUnavailable());
852 verify(callback).lockUnavailable(lock);
856 * Tests the case where the lock is freed, but doUnlock is not completed, between the
857 * call to isUnavailable() and the call to compute().
860 public void testDistributedLockExtendLockFreed() {
861 feature = new FreeWithFreeLockingFeature(false);
863 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
865 lock.extend(HOLD_SEC2, callback);
866 assertTrue(lock.isUnavailable());
867 verify(callback).lockUnavailable(lock);
871 public void testDistributedLockScheduleRequest() {
872 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
875 verify(callback).lockAvailable(lock);
879 public void testDistributedLockRescheduleRequest() {
880 // use a data source that throws an exception when getConnection() is called
881 InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
884 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
886 // invoke doLock - should fail and reschedule
889 // should still be waiting
890 assertTrue(lock.isWaiting());
891 verify(callback, never()).lockUnavailable(lock);
893 // free the lock while doLock is executing
894 invfeat.freeLock = true;
896 // try scheduled request - should just invoke doUnlock
899 // should still be waiting
900 assertTrue(lock.isUnavailable());
901 verify(callback, never()).lockUnavailable(lock);
903 // should have scheduled a retry of doUnlock
904 verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
908 public void testDistributedLockGetNextRequest() {
909 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
912 * run doLock. This should cause getNextRequest() to be called twice, once with a
913 * request in the queue, and the second time with request=null.
919 * Tests getNextRequest(), where the same request is still in the queue the second
923 public void testDistributedLockGetNextRequestSameRequest() {
924 // force reschedule to be invoked
925 feature = new InvalidDbLockingFeature(TRANSIENT);
927 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
930 * run doLock. This should cause getNextRequest() to be called twice, once with a
931 * request in the queue, and the second time with the same request again.
935 verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
939 public void testDistributedLockDoRequest() {
940 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
942 assertTrue(lock.isWaiting());
944 // run doLock via doRequest
947 assertTrue(lock.isActive());
951 * Tests doRequest(), when doRequest() is already running within another thread.
954 public void testDistributedLockDoRequestBusy() {
956 * this feature will invoke a request in a background thread while it's being run
957 * in a foreground thread.
959 AtomicBoolean running = new AtomicBoolean(false);
960 AtomicBoolean returned = new AtomicBoolean(false);
962 feature = new MyLockingFeature(true) {
964 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
965 LockCallback callback) {
966 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
967 private static final long serialVersionUID = 1L;
970 protected boolean doDbInsert(Connection conn) throws SQLException {
972 // already inside the thread - don't recurse any further
973 return super.doDbInsert(conn);
978 Thread thread = new Thread(() -> {
979 // run doLock from within the new thread
982 thread.setDaemon(true);
985 // wait for the background thread to complete before continuing
988 } catch (InterruptedException ignore) {
989 Thread.currentThread().interrupt();
992 returned.set(!thread.isAlive());
994 return super.doDbInsert(conn);
1000 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1005 assertTrue(returned.get());
1009 * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1011 * @throws SQLException if an error occurs
1014 public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1015 // throw run-time exception
1016 when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1018 // use a data source that throws an exception when getConnection() is called
1019 feature = new MyLockingFeature(true) {
1021 protected BasicDataSource makeDataSource() {
1026 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1028 // invoke doLock - should NOT reschedule
1031 assertTrue(lock.isUnavailable());
1032 verify(callback).lockUnavailable(lock);
1034 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1038 * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1041 * @throws SQLException if an error occurs
1044 public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1045 // throw run-time exception
1046 when(datasrc.getConnection()).thenAnswer(answer -> {
1048 throw new IllegalStateException(EXPECTED_EXCEPTION);
1051 // use a data source that throws an exception when getConnection() is called
1052 feature = new MyLockingFeature(true) {
1054 protected BasicDataSource makeDataSource() {
1059 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1061 // invoke doLock - should NOT reschedule
1064 assertTrue(lock.isUnavailable());
1065 verify(callback, never()).lockUnavailable(lock);
1067 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1071 * Tests doRequest() when the retry count gets exhausted.
1074 public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1075 // use a data source that throws an exception when getConnection() is called
1076 feature = new InvalidDbLockingFeature(TRANSIENT);
1078 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1080 // invoke doLock - should fail and reschedule
1083 // should still be waiting
1084 assertTrue(lock.isWaiting());
1085 verify(callback, never()).lockUnavailable(lock);
1087 // try again, via SCHEDULER - first retry fails
1090 // should still be waiting
1091 assertTrue(lock.isWaiting());
1092 verify(callback, never()).lockUnavailable(lock);
1094 // try again, via SCHEDULER - final retry fails
1096 assertTrue(lock.isUnavailable());
1098 // now callback should have been called
1099 verify(callback).lockUnavailable(lock);
1103 * Tests doRequest() when a non-transient DB exception is thrown.
1106 public void testDistributedLockDoRequestNotTransient() {
1108 * use a data source that throws a PERMANENT exception when getConnection() is
1111 feature = new InvalidDbLockingFeature(PERMANENT);
1113 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1115 // invoke doLock - should fail
1118 assertTrue(lock.isUnavailable());
1119 verify(callback).lockUnavailable(lock);
1121 // should not have scheduled anything new
1122 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1123 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1127 public void testDistributedLockDoLock() throws SQLException {
1128 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1130 // invoke doLock - should simply do an insert
1131 long tbegin = System.currentTimeMillis();
1134 assertEquals(1, getRecordCount());
1135 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1136 verify(callback).lockAvailable(lock);
1140 * Tests doLock() when the lock is freed before doLock runs.
1142 * @throws SQLException if an error occurs
1145 public void testDistributedLockDoLockFreed() throws SQLException {
1146 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1148 lock.setState(LockState.UNAVAILABLE);
1150 // invoke doLock - should do nothing
1153 assertEquals(0, getRecordCount());
1155 verify(callback, never()).lockAvailable(lock);
1159 * Tests doLock() when a DB exception is thrown.
1162 public void testDistributedLockDoLockEx() {
1163 // use a data source that throws an exception when getConnection() is called
1164 feature = new InvalidDbLockingFeature(PERMANENT);
1166 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1168 // invoke doLock - should simply do an insert
1171 // lock should have failed due to exception
1172 verify(callback).lockUnavailable(lock);
1176 * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1180 public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1181 // insert an expired record
1182 insertRecord(RESOURCE, feature.getUuidString(), 0);
1184 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1186 // invoke doLock - should simply do an update
1188 verify(callback).lockAvailable(lock);
1192 * Tests doLock() when a locked record already exists.
1195 public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1196 // insert an expired record
1197 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1199 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1204 // lock should have failed because it's already locked
1205 verify(callback).lockUnavailable(lock);
1209 public void testDistributedLockDoUnlock() throws SQLException {
1210 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1217 // invoke doUnlock()
1218 long tbegin = System.currentTimeMillis();
1221 assertEquals(0, getRecordCount());
1222 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1224 assertTrue(lock.isUnavailable());
1226 // no more callbacks should have occurred
1227 verify(callback, times(1)).lockAvailable(lock);
1228 verify(callback, never()).lockUnavailable(lock);
1232 * Tests doUnlock() when a DB exception is thrown.
1236 public void testDistributedLockDoUnlockEx() {
1237 feature = new InvalidDbLockingFeature(PERMANENT);
1239 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1241 // do NOT invoke doLock() - it will fail without a DB connection
1245 // invoke doUnlock()
1248 assertTrue(lock.isUnavailable());
1250 // no more callbacks should have occurred
1251 verify(callback, never()).lockAvailable(lock);
1252 verify(callback, never()).lockUnavailable(lock);
1256 public void testDistributedLockDoExtend() throws SQLException {
1257 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1260 LockCallback callback2 = mock(LockCallback.class);
1261 lock.extend(HOLD_SEC2, callback2);
1264 long tbegin = System.currentTimeMillis();
1267 assertEquals(1, getRecordCount());
1268 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1270 assertTrue(lock.isActive());
1272 // no more callbacks should have occurred
1273 verify(callback).lockAvailable(lock);
1274 verify(callback, never()).lockUnavailable(lock);
1276 // extension should have succeeded
1277 verify(callback2).lockAvailable(lock);
1278 verify(callback2, never()).lockUnavailable(lock);
1282 * Tests doExtend() when the lock is freed before doExtend runs.
1284 * @throws SQLException if an error occurs
1287 public void testDistributedLockDoExtendFreed() throws SQLException {
1288 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1289 lock.extend(HOLD_SEC2, callback);
1291 lock.setState(LockState.UNAVAILABLE);
1293 // invoke doExtend - should do nothing
1296 assertEquals(0, getRecordCount());
1298 verify(callback, never()).lockAvailable(lock);
1302 * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1305 * @throws SQLException if an error occurs
1308 public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1309 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1312 LockCallback callback2 = mock(LockCallback.class);
1313 lock.extend(HOLD_SEC2, callback2);
1315 // delete the record so it's forced to re-insert it
1319 long tbegin = System.currentTimeMillis();
1322 assertEquals(1, getRecordCount());
1323 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1325 assertTrue(lock.isActive());
1327 // no more callbacks should have occurred
1328 verify(callback).lockAvailable(lock);
1329 verify(callback, never()).lockUnavailable(lock);
1331 // extension should have succeeded
1332 verify(callback2).lockAvailable(lock);
1333 verify(callback2, never()).lockUnavailable(lock);
1337 * Tests doExtend() when both update and insert fail.
1341 public void testDistributedLockDoExtendNeitherSucceeds() {
1343 * this feature will create a lock that returns false when doDbUpdate() is
1344 * invoked, or when doDbInsert() is invoked a second time
1346 feature = new MyLockingFeature(true) {
1348 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1349 LockCallback callback) {
1350 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1351 private static final long serialVersionUID = 1L;
1352 private int ntimes = 0;
1355 protected boolean doDbInsert(Connection conn) throws SQLException {
1360 return super.doDbInsert(conn);
1364 protected boolean doDbUpdate(Connection conn) {
1371 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1374 LockCallback callback2 = mock(LockCallback.class);
1375 lock.extend(HOLD_SEC2, callback2);
1380 assertTrue(lock.isUnavailable());
1382 // no more callbacks should have occurred
1383 verify(callback).lockAvailable(lock);
1384 verify(callback, never()).lockUnavailable(lock);
1386 // extension should have failed
1387 verify(callback2, never()).lockAvailable(lock);
1388 verify(callback2).lockUnavailable(lock);
1392 * Tests doExtend() when an exception occurs.
1394 * @throws SQLException if an error occurs
1397 public void testDistributedLockDoExtendEx() throws SQLException {
1398 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1402 * delete the record and insert one with a different owner, which will cause
1403 * doDbInsert() to throw an exception
1406 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1408 LockCallback callback2 = mock(LockCallback.class);
1409 lock.extend(HOLD_SEC2, callback2);
1414 assertTrue(lock.isUnavailable());
1416 // no more callbacks should have occurred
1417 verify(callback).lockAvailable(lock);
1418 verify(callback, never()).lockUnavailable(lock);
1420 // extension should have failed
1421 verify(callback2, never()).lockAvailable(lock);
1422 verify(callback2).lockUnavailable(lock);
1426 public void testDistributedLockToString() {
1427 String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
1428 assertNotNull(text);
1429 assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1433 public void testMakeThreadPool() {
1434 // use a REAL feature to test this
1435 feature = new DistributedLockManager();
1437 // this should create a thread pool
1438 feature.beforeCreateLockManager(engine, new Properties());
1439 feature.afterStart(engine);
1441 assertThatCode(this::shutdownFeature).doesNotThrowAnyException();
1445 * Performs a multi-threaded test of the locking facility.
1447 * @throws InterruptedException if the current thread is interrupted while waiting for
1448 * the background threads to complete
1451 public void testMultiThreaded() throws InterruptedException {
1452 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1454 feature = new DistributedLockManager();
1455 feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1456 feature.afterStart(PolicyEngineConstants.getManager());
1458 List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1459 for (int x = 0; x < MAX_THREADS; ++x) {
1460 threads.add(new MyThread());
1463 threads.forEach(Thread::start);
1465 for (MyThread thread : threads) {
1467 assertFalse(thread.isAlive());
1470 for (MyThread thread : threads) {
1471 if (thread.err != null) {
1476 assertTrue(nsuccesses.get() > 0);
1479 private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
1480 boolean waitForLock) {
1481 return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
1484 private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1485 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1486 try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1487 oos.writeObject(lock);
1490 ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1491 try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1492 return (DistributedLock) ois.readObject();
1497 * Runs the checkExpired() action.
1499 * @param nskip number of actions in the work queue to skip
1500 * @param nadditional number of additional actions that appear in the work queue
1501 * <i>after</i> the checkExpired action
1502 * @param schedSec number of seconds for which the checker should have been scheduled
1504 private void runChecker(int nskip, int nadditional, long schedSec) {
1505 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1506 verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1507 Runnable action = captor.getAllValues().get(nskip);
1512 * Runs a lock action (e.g., doLock, doUnlock).
1514 * @param nskip number of actions in the work queue to skip
1515 * @param nadditional number of additional actions that appear in the work queue
1516 * <i>after</i> the desired action
1518 void runLock(int nskip, int nadditional) {
1519 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1520 verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1522 Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1527 * Runs a scheduled action (e.g., "retry" action).
1529 * @param nskip number of actions in the work queue to skip
1530 * @param nadditional number of additional actions that appear in the work queue
1531 * <i>after</i> the desired action
1533 void runSchedule(int nskip, int nadditional) {
1534 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1535 verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
1537 Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1542 * Gets a count of the number of lock records in the DB.
1544 * @return the number of lock records in the DB
1545 * @throws SQLException if an error occurs accessing the DB
1547 private int getRecordCount() throws SQLException {
1548 try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1549 ResultSet result = stmt.executeQuery()) {
1551 if (result.next()) {
1552 return result.getInt(1);
1561 * Determines if there is a record for the given resource whose expiration time is in
1562 * the expected range.
1564 * @param resourceId ID of the resource of interest
1565 * @param uuidString UUID string of the owner
1566 * @param holdSec seconds for which the lock was to be held
1567 * @param tbegin earliest time, in milliseconds, at which the record could have been
1568 * inserted into the DB
1569 * @return {@code true} if a record is found, {@code false} otherwise
1570 * @throws SQLException if an error occurs accessing the DB
1572 private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1573 try (PreparedStatement stmt =
1574 conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1575 + " WHERE resourceId=? AND host=? AND owner=?")) {
1577 stmt.setString(1, resourceId);
1578 stmt.setString(2, feature.getPdpName());
1579 stmt.setString(3, uuidString);
1581 try (ResultSet result = stmt.executeQuery()) {
1582 if (result.next()) {
1583 int remaining = result.getInt(1);
1584 long maxDiff = System.currentTimeMillis() - tbegin;
1585 return (remaining >= 0 && holdSec - remaining <= maxDiff);
1595 * Inserts a record into the DB.
1597 * @param resourceId ID of the resource of interest
1598 * @param uuidString UUID string of the owner
1599 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1600 * @throws SQLException if an error occurs accessing the DB
1602 private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1603 this.insertRecord(resourceId, feature.getPdpName(), uuidString, expireOffset);
1606 private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1607 throws SQLException {
1608 try (PreparedStatement stmt =
1609 conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1610 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1612 stmt.setString(1, resourceId);
1613 stmt.setString(2, hostName);
1614 stmt.setString(3, uuidString);
1615 stmt.setInt(4, expireOffset);
1617 assertEquals(1, stmt.executeUpdate());
1622 * Updates a record in the DB.
1624 * @param resourceId ID of the resource of interest
1625 * @param newUuid UUID string of the <i>new</i> owner
1626 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1627 * @throws SQLException if an error occurs accessing the DB
1629 private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1630 try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1631 + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1633 stmt.setString(1, newHost);
1634 stmt.setString(2, newUuid);
1635 stmt.setInt(3, expireOffset);
1636 stmt.setString(4, resourceId);
1638 assertEquals(1, stmt.executeUpdate());
1643 * Feature that uses <i>exsvc</i> to execute requests.
1645 private class MyLockingFeature extends DistributedLockManager {
1647 public MyLockingFeature(boolean init) {
1650 exsvc = mock(ScheduledExecutorService.class);
1651 when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1652 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1655 beforeCreateLockManager(engine, new Properties());
1663 * Feature whose data source all throws exceptions.
1665 private class InvalidDbLockingFeature extends MyLockingFeature {
1666 private boolean isTransient;
1667 private boolean freeLock = false;
1669 public InvalidDbLockingFeature(boolean isTransient) {
1670 // pass "false" because we have to set the error code BEFORE calling
1674 this.isTransient = isTransient;
1676 this.beforeCreateLockManager(engine, new Properties());
1678 this.afterStart(engine);
1682 protected BasicDataSource makeDataSource() throws Exception {
1683 when(datasrc.getConnection()).thenAnswer(answer -> {
1692 doThrow(makeEx()).when(datasrc).close();
1697 protected SQLException makeEx() {
1699 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1702 return new SQLException(EXPECTED_EXCEPTION);
1708 * Feature whose locks free themselves while free() is already running.
1710 private class FreeWithFreeLockingFeature extends MyLockingFeature {
1711 private boolean relock;
1713 public FreeWithFreeLockingFeature(boolean relock) {
1715 this.relock = relock;
1719 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1720 LockCallback callback) {
1722 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1723 private static final long serialVersionUID = 1L;
1724 private boolean checked = false;
1727 public boolean isUnavailable() {
1729 return super.isUnavailable();
1734 // release and relock
1742 createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1752 * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
1753 * extend it, and then unlock it.
1755 private class MyThread extends Thread {
1756 AssertionError err = null;
1765 for (int x = 0; x < MAX_LOOPS; ++x) {
1769 } catch (AssertionError e) {
1774 private void makeAttempt() {
1776 Semaphore sem = new Semaphore(0);
1778 LockCallback cb = new LockCallback() {
1780 public void lockAvailable(Lock lock) {
1785 public void lockUnavailable(Lock lock) {
1790 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1792 // wait for callback, whether available or unavailable
1793 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1794 if (!lock.isActive()) {
1798 nsuccesses.incrementAndGet();
1800 assertEquals(1, nactive.incrementAndGet());
1802 lock.extend(HOLD_SEC2, cb);
1803 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1804 assertTrue(lock.isActive());
1806 // decrement BEFORE free()
1807 nactive.decrementAndGet();
1809 assertTrue(lock.free());
1810 assertTrue(lock.isUnavailable());
1812 } catch (InterruptedException e) {
1813 Thread.currentThread().interrupt();
1814 throw new AssertionError("interrupted", e);