2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2022 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2023 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.distributed.locking;
24 import static org.assertj.core.api.Assertions.assertThat;
25 import static org.assertj.core.api.Assertions.assertThatCode;
26 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
27 import static org.assertj.core.api.Assertions.assertThatThrownBy;
28 import static org.junit.Assert.assertEquals;
29 import static org.junit.Assert.assertFalse;
30 import static org.junit.Assert.assertNotNull;
31 import static org.junit.Assert.assertNotSame;
32 import static org.junit.Assert.assertNull;
33 import static org.junit.Assert.assertSame;
34 import static org.junit.Assert.assertTrue;
35 import static org.mockito.ArgumentMatchers.any;
36 import static org.mockito.ArgumentMatchers.anyBoolean;
37 import static org.mockito.ArgumentMatchers.anyLong;
38 import static org.mockito.ArgumentMatchers.eq;
39 import static org.mockito.Mockito.doThrow;
40 import static org.mockito.Mockito.mock;
41 import static org.mockito.Mockito.never;
42 import static org.mockito.Mockito.times;
43 import static org.mockito.Mockito.verify;
44 import static org.mockito.Mockito.when;
46 import java.io.ByteArrayInputStream;
47 import java.io.ByteArrayOutputStream;
48 import java.io.ObjectInputStream;
49 import java.io.ObjectOutputStream;
50 import java.sql.Connection;
51 import java.sql.DriverManager;
52 import java.sql.PreparedStatement;
53 import java.sql.ResultSet;
54 import java.sql.SQLException;
55 import java.sql.SQLTransientException;
56 import java.util.ArrayList;
57 import java.util.List;
58 import java.util.Properties;
59 import java.util.concurrent.Executors;
60 import java.util.concurrent.RejectedExecutionException;
61 import java.util.concurrent.ScheduledExecutorService;
62 import java.util.concurrent.ScheduledFuture;
63 import java.util.concurrent.Semaphore;
64 import java.util.concurrent.TimeUnit;
65 import java.util.concurrent.atomic.AtomicBoolean;
66 import java.util.concurrent.atomic.AtomicInteger;
67 import java.util.concurrent.atomic.AtomicReference;
68 import org.apache.commons.dbcp2.BasicDataSource;
69 import org.junit.After;
70 import org.junit.AfterClass;
71 import org.junit.Before;
72 import org.junit.BeforeClass;
73 import org.junit.Test;
74 import org.junit.runner.RunWith;
75 import org.kie.api.runtime.KieSession;
76 import org.mockito.ArgumentCaptor;
77 import org.mockito.Mock;
78 import org.mockito.junit.MockitoJUnitRunner;
79 import org.onap.policy.common.utils.services.OrderedServiceImpl;
80 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
81 import org.onap.policy.drools.core.PolicySession;
82 import org.onap.policy.drools.core.lock.Lock;
83 import org.onap.policy.drools.core.lock.LockCallback;
84 import org.onap.policy.drools.core.lock.LockState;
85 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
86 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
87 import org.onap.policy.drools.system.PolicyEngine;
88 import org.onap.policy.drools.system.PolicyEngineConstants;
89 import org.springframework.test.util.ReflectionTestUtils;
91 @RunWith(MockitoJUnitRunner.class)
92 public class DistributedLockManagerTest {
93 private static final long EXPIRE_SEC = 900L;
94 private static final long RETRY_SEC = 60L;
95 private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
96 private static final String OTHER_HOST = "other-host";
97 private static final String OTHER_OWNER = "other-owner";
98 private static final String EXPECTED_EXCEPTION = "expected exception";
99 private static final String DB_CONNECTION =
100 "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
101 private static final String DB_USER = "user";
102 private static final String DB_PASSWORD = "password";
103 private static final String OWNER_KEY = "my key";
104 private static final String RESOURCE = "my resource";
105 private static final String RESOURCE2 = "my resource #2";
106 private static final String RESOURCE3 = "my resource #3";
107 private static final String RESOURCE4 = "my resource #4";
108 private static final String RESOURCE5 = "my resource #5";
109 private static final int HOLD_SEC = 100;
110 private static final int HOLD_SEC2 = 120;
111 private static final int MAX_THREADS = 5;
112 private static final int MAX_LOOPS = 100;
113 private static final boolean TRANSIENT = true;
114 private static final boolean PERMANENT = false;
116 // number of execute() calls before the first lock attempt
117 private static final int PRE_LOCK_EXECS = 1;
119 // number of execute() calls before the first schedule attempt
120 private static final int PRE_SCHED_EXECS = 1;
122 private static Connection conn = null;
123 private static ScheduledExecutorService saveExec;
124 private static ScheduledExecutorService realExec;
127 private PolicyEngine engine;
130 private KieSession kieSess;
133 private ScheduledExecutorService exsvc;
136 private ScheduledFuture<?> checker;
139 private LockCallback callback;
142 private BasicDataSource datasrc;
144 private DistributedLock lock;
145 private PolicySession session;
147 private AtomicInteger nactive;
148 private AtomicInteger nsuccesses;
149 private DistributedLockManager feature;
152 * Configures the location of the property files and creates the DB.
154 * @throws SQLException if the DB cannot be created
157 public static void setUpBeforeClass() throws SQLException {
158 SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
159 PolicyEngineConstants.getManager().configure(new Properties());
161 conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
163 try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
164 + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
165 + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
166 createStmt.executeUpdate();
169 saveExec = (ScheduledExecutorService) ReflectionTestUtils.getField(PolicyEngineConstants.getManager(),
170 POLICY_ENGINE_EXECUTOR_FIELD);
172 realExec = Executors.newScheduledThreadPool(3);
176 * Restores static fields.
179 public static void tearDownAfterClass() throws SQLException {
180 ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
186 * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
189 * @throws SQLException if the lock records cannot be deleted from the DB
192 public void setUp() throws SQLException {
193 // grant() and deny() calls will come through here and be immediately executed
194 session = new PolicySession(null, null, kieSess) {
196 public void insertDrools(Object object) {
197 ((Runnable) object).run();
201 session.setPolicySession();
203 nactive = new AtomicInteger(0);
204 nsuccesses = new AtomicInteger(0);
208 feature = new MyLockingFeature(true);
212 public void tearDown() throws SQLException {
217 private void cleanDb() throws SQLException {
218 try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
219 stmt.executeUpdate();
223 private void shutdownFeature() {
224 if (feature != null) {
225 feature.afterStop(engine);
231 * Tests that the feature is found in the expected service sets.
234 public void testServiceApis() {
235 assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
236 .anyMatch(obj -> obj instanceof DistributedLockManager));
240 public void testGetSequenceNumber() {
241 assertEquals(1000, feature.getSequenceNumber());
245 public void testBeforeCreateLockManager() {
246 assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
250 * Tests beforeCreate(), when getProperties() throws a runtime exception.
253 public void testBeforeCreateLockManagerEx() {
256 feature = new MyLockingFeature(false) {
258 protected Properties getProperties(String fileName) {
259 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
263 Properties props = new Properties();
264 assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, props))
265 .isInstanceOf(DistributedLockManagerException.class);
269 public void testAfterStart() {
270 // verify that cleanup & expire check are both added to the queue
271 verify(exsvc).execute(any());
272 verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
276 * Tests afterStart(), when thread pool throws a runtime exception.
279 public void testAfterStartExInThreadPool() {
282 feature = new MyLockingFeature(false);
284 doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
286 assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
290 public void testDeleteExpiredDbLocks() throws SQLException {
291 // add records: two expired, one not
292 insertRecord(RESOURCE, feature.getUuidString(), -1);
293 insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
294 insertRecord(RESOURCE3, OTHER_OWNER, 0);
295 insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
297 // get the clean-up function and execute it
298 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
299 verify(exsvc).execute(captor.capture());
301 long tbegin = System.currentTimeMillis();
302 Runnable action = captor.getValue();
305 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
306 assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
307 assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
308 assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
310 assertEquals(2, getRecordCount());
314 * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
318 public void testDeleteExpiredDbLocksEx() {
319 feature = new InvalidDbLockingFeature(TRANSIENT);
321 // get the clean-up function and execute it
322 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
323 verify(exsvc).execute(captor.capture());
325 Runnable action = captor.getValue();
327 // should not throw an exception
332 public void testAfterStop() {
334 verify(checker).cancel(anyBoolean());
336 feature = new DistributedLockManager();
338 // shutdown without calling afterStart()
344 * Tests afterStop(), when the data source throws an exception when close() is called.
348 public void testAfterStopEx() {
351 // use a data source that throws an exception when closed
352 feature = new InvalidDbLockingFeature(TRANSIENT);
354 assertThatCode(this::shutdownFeature).doesNotThrowAnyException();
358 public void testCreateLock() throws SQLException {
359 verify(exsvc).execute(any());
361 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
362 assertTrue(lock.isWaiting());
364 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
366 // this lock should fail
367 LockCallback callback2 = mock(LockCallback.class);
368 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
369 assertTrue(lock2.isUnavailable());
370 verify(callback2, never()).lockAvailable(lock2);
371 verify(callback2).lockUnavailable(lock2);
373 // this should fail, too
374 LockCallback callback3 = mock(LockCallback.class);
375 DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
376 assertTrue(lock3.isUnavailable());
377 verify(callback3, never()).lockAvailable(lock3);
378 verify(callback3).lockUnavailable(lock3);
380 // no change to first
381 assertTrue(lock.isWaiting());
383 // no callbacks to the first lock
384 verify(callback, never()).lockAvailable(lock);
385 verify(callback, never()).lockUnavailable(lock);
387 assertTrue(lock.isWaiting());
388 assertEquals(0, getRecordCount());
391 assertTrue(lock.isActive());
392 assertEquals(1, getRecordCount());
394 verify(callback).lockAvailable(lock);
395 verify(callback, never()).lockUnavailable(lock);
397 // this should succeed
398 DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
399 assertTrue(lock4.isWaiting());
401 // after running checker, original records should still remain
402 runChecker(0, 0, EXPIRE_SEC);
403 assertEquals(1, getRecordCount());
404 verify(callback, never()).lockUnavailable(lock);
408 * Tests createLock() when the feature is not the latest instance.
411 public void testCreateLockNotLatestInstance() {
412 DistributedLockManager.setLatestInstance(null);
414 Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
415 assertTrue(lock.isUnavailable());
416 verify(callback, never()).lockAvailable(any());
417 verify(callback).lockUnavailable(lock);
421 public void testCheckExpired() throws SQLException {
422 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
425 LockCallback callback2 = mock(LockCallback.class);
426 final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
429 LockCallback callback3 = mock(LockCallback.class);
430 final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
433 LockCallback callback4 = mock(LockCallback.class);
434 final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
437 LockCallback callback5 = mock(LockCallback.class);
438 final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
441 assertEquals(5, getRecordCount());
444 updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
446 // change host of another record
447 updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
449 // change uuid of another record
450 updateRecord(RESOURCE5, feature.getPdpName(), OTHER_OWNER, HOLD_SEC);
453 runChecker(0, 0, EXPIRE_SEC);
456 assertTrue(lock.isUnavailable());
457 assertTrue(lock2.isActive());
458 assertTrue(lock3.isUnavailable());
459 assertTrue(lock4.isActive());
460 assertTrue(lock5.isUnavailable());
466 verify(callback).lockUnavailable(lock);
467 verify(callback3).lockUnavailable(lock3);
468 verify(callback5).lockUnavailable(lock5);
470 verify(callback2, never()).lockUnavailable(lock2);
471 verify(callback4, never()).lockUnavailable(lock4);
473 // another check should have been scheduled, with the normal interval
474 runChecker(1, 0, EXPIRE_SEC);
478 * Tests checkExpired(), when schedule() throws an exception.
481 public void testCheckExpiredExecRejected() {
482 // arrange for execution to be rejected
483 when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
484 .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
486 runChecker(0, 0, EXPIRE_SEC);
490 * Tests checkExpired(), when getConnection() throws an exception.
493 public void testCheckExpiredSqlEx() {
494 // use a data source that throws an exception when getConnection() is called
495 feature = new InvalidDbLockingFeature(TRANSIENT);
497 runChecker(0, 0, EXPIRE_SEC);
499 // it should have scheduled another check, sooner
500 runChecker(0, 0, RETRY_SEC);
504 * Tests checkExpired(), when getConnection() throws an exception and the feature is
508 public void testCheckExpiredSqlExFeatureStopped() {
509 // use a data source that throws an exception when getConnection() is called
510 feature = new InvalidDbLockingFeature(TRANSIENT) {
512 protected SQLException makeEx() {
514 return super.makeEx();
518 runChecker(0, 0, EXPIRE_SEC);
520 // it should NOT have scheduled another check
521 verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
525 public void testExpireLocks() throws SQLException {
526 AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
528 feature = new MyLockingFeature(true) {
530 protected BasicDataSource makeDataSource() throws Exception {
531 // get the real data source
532 BasicDataSource src2 = super.makeDataSource();
534 when(datasrc.getConnection()).thenAnswer(answer -> {
535 DistributedLock lck = freeLock.getAndSet(null);
544 return src2.getConnection();
551 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
554 LockCallback callback2 = mock(LockCallback.class);
555 final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
558 LockCallback callback3 = mock(LockCallback.class);
559 final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
560 // don't run doLock for lock3 - leave it in the waiting state
562 LockCallback callback4 = mock(LockCallback.class);
563 final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
566 assertEquals(3, getRecordCount());
569 updateRecord(RESOURCE, feature.getPdpName(), feature.getUuidString(), -1);
571 // arrange to free lock4 while the checker is running
575 runChecker(0, 0, EXPIRE_SEC);
578 assertTrue(lock.isUnavailable());
579 assertTrue(lock2.isActive());
580 assertTrue(lock3.isWaiting());
581 assertTrue(lock4.isUnavailable());
584 verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
586 verify(callback).lockUnavailable(lock);
587 verify(callback2, never()).lockUnavailable(lock2);
588 verify(callback3, never()).lockUnavailable(lock3);
589 verify(callback4, never()).lockUnavailable(lock4);
593 public void testDistributedLockNoArgs() {
594 DistributedLock lock = new DistributedLock();
595 assertNull(lock.getResourceId());
596 assertNull(lock.getOwnerKey());
597 assertNull(lock.getCallback());
598 assertEquals(0, lock.getHoldSec());
602 public void testDistributedLock() {
603 assertThatIllegalArgumentException()
604 .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
605 .withMessageContaining("holdSec");
607 // should generate no exception
608 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
612 public void testDistributedLockSerializable() throws Exception {
613 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
614 lock = roundTrip(lock);
616 assertTrue(lock.isWaiting());
618 assertEquals(RESOURCE, lock.getResourceId());
619 assertEquals(OWNER_KEY, lock.getOwnerKey());
620 assertNull(lock.getCallback());
621 assertEquals(HOLD_SEC, lock.getHoldSec());
625 public void testGrant() {
626 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
627 assertFalse(lock.isActive());
629 // execute the doLock() call
632 assertTrue(lock.isActive());
634 // the callback for the lock should have been run in the foreground thread
635 verify(callback).lockAvailable(lock);
639 public void testDistributedLockDeny() {
641 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
643 // get another lock - should fail
644 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
646 assertTrue(lock.isUnavailable());
648 // the callback for the second lock should have been run in the foreground thread
649 verify(callback).lockUnavailable(lock);
651 // should only have a request for the first lock
652 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
656 public void testDistributedLockFree() {
657 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
659 assertTrue(lock.free());
660 assertTrue(lock.isUnavailable());
662 // run both requests associated with the lock
666 // should not have changed state
667 assertTrue(lock.isUnavailable());
669 // attempt to free it again
670 assertFalse(lock.free());
672 // should not have queued anything else
673 verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
675 // new lock should succeed
676 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
677 assertNotSame(lock2, lock);
678 assertTrue(lock2.isWaiting());
682 * Tests that free() works on a serialized lock with a new feature.
684 * @throws Exception if an error occurs
687 public void testDistributedLockFreeSerialized() throws Exception {
688 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
690 feature = new MyLockingFeature(true);
692 lock = roundTrip(lock);
693 assertTrue(lock.free());
694 assertTrue(lock.isUnavailable());
698 * Tests free() on a serialized lock without a feature.
700 * @throws Exception if an error occurs
703 public void testDistributedLockFreeNoFeature() throws Exception {
704 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
706 DistributedLockManager.setLatestInstance(null);
708 lock = roundTrip(lock);
709 assertFalse(lock.free());
710 assertTrue(lock.isUnavailable());
714 * Tests the case where the lock is freed and doUnlock called between the call to
715 * isUnavailable() and the call to compute().
718 public void testDistributedLockFreeUnlocked() {
719 feature = new FreeWithFreeLockingFeature(true);
721 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
723 assertFalse(lock.free());
724 assertTrue(lock.isUnavailable());
728 * Tests the case where the lock is freed, but doUnlock is not completed, between the
729 * call to isUnavailable() and the call to compute().
732 public void testDistributedLockFreeLockFreed() {
733 feature = new FreeWithFreeLockingFeature(false);
735 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
737 assertFalse(lock.free());
738 assertTrue(lock.isUnavailable());
742 public void testDistributedLockExtend() {
743 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
745 // lock2 should be denied - called back by this thread
746 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
747 verify(callback, never()).lockAvailable(lock2);
748 verify(callback).lockUnavailable(lock2);
750 // lock2 will still be denied - called back by this thread
751 lock2.extend(HOLD_SEC, callback);
752 verify(callback, times(2)).lockUnavailable(lock2);
754 // force lock2 to be active - should still be denied
755 ReflectionTestUtils.setField(lock2, "state", LockState.ACTIVE);
756 lock2.extend(HOLD_SEC, callback);
757 verify(callback, times(3)).lockUnavailable(lock2);
759 assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
760 .withMessageContaining("holdSec");
764 assertTrue(lock.isActive());
766 // now extend the first lock
767 LockCallback callback2 = mock(LockCallback.class);
768 lock.extend(HOLD_SEC2, callback2);
769 assertTrue(lock.isWaiting());
771 // execute doExtend()
773 lock.extend(HOLD_SEC2, callback2);
774 assertEquals(HOLD_SEC2, lock.getHoldSec());
775 verify(callback2).lockAvailable(lock);
776 verify(callback2, never()).lockUnavailable(lock);
780 * Tests that extend() works on a serialized lock with a new feature.
782 * @throws Exception if an error occurs
785 public void testDistributedLockExtendSerialized() throws Exception {
786 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
790 assertTrue(lock.isActive());
792 feature = new MyLockingFeature(true);
794 lock = roundTrip(lock);
795 assertTrue(lock.isActive());
797 LockCallback scallback = mock(LockCallback.class);
799 lock.extend(HOLD_SEC, scallback);
800 assertTrue(lock.isWaiting());
802 // run doExtend (in new feature)
804 assertTrue(lock.isActive());
806 verify(scallback).lockAvailable(lock);
807 verify(scallback, never()).lockUnavailable(lock);
811 * Tests extend() on a serialized lock without a feature.
813 * @throws Exception if an error occurs
816 public void testDistributedLockExtendNoFeature() throws Exception {
817 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
821 assertTrue(lock.isActive());
823 DistributedLockManager.setLatestInstance(null);
825 lock = roundTrip(lock);
826 assertTrue(lock.isActive());
828 LockCallback scallback = mock(LockCallback.class);
830 lock.extend(HOLD_SEC, scallback);
831 assertTrue(lock.isUnavailable());
833 verify(scallback, never()).lockAvailable(lock);
834 verify(scallback).lockUnavailable(lock);
838 * Tests the case where the lock is freed and doUnlock called between the call to
839 * isUnavailable() and the call to compute().
842 public void testDistributedLockExtendUnlocked() {
843 feature = new FreeWithFreeLockingFeature(true);
845 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
847 lock.extend(HOLD_SEC2, callback);
848 assertTrue(lock.isUnavailable());
849 verify(callback).lockUnavailable(lock);
853 * Tests the case where the lock is freed, but doUnlock is not completed, between the
854 * call to isUnavailable() and the call to compute().
857 public void testDistributedLockExtendLockFreed() {
858 feature = new FreeWithFreeLockingFeature(false);
860 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
862 lock.extend(HOLD_SEC2, callback);
863 assertTrue(lock.isUnavailable());
864 verify(callback).lockUnavailable(lock);
868 public void testDistributedLockScheduleRequest() {
869 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
872 verify(callback).lockAvailable(lock);
876 public void testDistributedLockRescheduleRequest() {
877 // use a data source that throws an exception when getConnection() is called
878 InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
881 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
883 // invoke doLock - should fail and reschedule
886 // should still be waiting
887 assertTrue(lock.isWaiting());
888 verify(callback, never()).lockUnavailable(lock);
890 // free the lock while doLock is executing
891 invfeat.freeLock = true;
893 // try scheduled request - should just invoke doUnlock
896 // should still be waiting
897 assertTrue(lock.isUnavailable());
898 verify(callback, never()).lockUnavailable(lock);
900 // should have scheduled a retry of doUnlock
901 verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
905 public void testDistributedLockGetNextRequest() {
906 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
909 * run doLock. This should cause getNextRequest() to be called twice, once with a
910 * request in the queue, and the second time with request=null.
916 * Tests getNextRequest(), where the same request is still in the queue the second
920 public void testDistributedLockGetNextRequestSameRequest() {
921 // force reschedule to be invoked
922 feature = new InvalidDbLockingFeature(TRANSIENT);
924 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
927 * run doLock. This should cause getNextRequest() to be called twice, once with a
928 * request in the queue, and the second time with the same request again.
932 verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
936 public void testDistributedLockDoRequest() {
937 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
939 assertTrue(lock.isWaiting());
941 // run doLock via doRequest
944 assertTrue(lock.isActive());
948 * Tests doRequest(), when doRequest() is already running within another thread.
951 public void testDistributedLockDoRequestBusy() {
953 * this feature will invoke a request in a background thread while it's being run
954 * in a foreground thread.
956 AtomicBoolean running = new AtomicBoolean(false);
957 AtomicBoolean returned = new AtomicBoolean(false);
959 feature = new MyLockingFeature(true) {
961 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
962 LockCallback callback) {
963 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
964 private static final long serialVersionUID = 1L;
967 protected boolean doDbInsert(Connection conn) throws SQLException {
969 // already inside the thread - don't recurse any further
970 return super.doDbInsert(conn);
975 Thread thread = new Thread(() -> {
976 // run doLock from within the new thread
979 thread.setDaemon(true);
982 // wait for the background thread to complete before continuing
985 } catch (InterruptedException ignore) {
986 Thread.currentThread().interrupt();
989 returned.set(!thread.isAlive());
991 return super.doDbInsert(conn);
997 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1002 assertTrue(returned.get());
1006 * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1008 * @throws SQLException if an error occurs
1011 public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1012 // throw run-time exception
1013 when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1015 // use a data source that throws an exception when getConnection() is called
1016 feature = new MyLockingFeature(true) {
1018 protected BasicDataSource makeDataSource() {
1023 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1025 // invoke doLock - should NOT reschedule
1028 assertTrue(lock.isUnavailable());
1029 verify(callback).lockUnavailable(lock);
1031 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1035 * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1038 * @throws SQLException if an error occurs
1041 public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1042 // throw run-time exception
1043 when(datasrc.getConnection()).thenAnswer(answer -> {
1045 throw new IllegalStateException(EXPECTED_EXCEPTION);
1048 // use a data source that throws an exception when getConnection() is called
1049 feature = new MyLockingFeature(true) {
1051 protected BasicDataSource makeDataSource() {
1056 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1058 // invoke doLock - should NOT reschedule
1061 assertTrue(lock.isUnavailable());
1062 verify(callback, never()).lockUnavailable(lock);
1064 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1068 * Tests doRequest() when the retry count gets exhausted.
1071 public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1072 // use a data source that throws an exception when getConnection() is called
1073 feature = new InvalidDbLockingFeature(TRANSIENT);
1075 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1077 // invoke doLock - should fail and reschedule
1080 // should still be waiting
1081 assertTrue(lock.isWaiting());
1082 verify(callback, never()).lockUnavailable(lock);
1084 // try again, via SCHEDULER - first retry fails
1087 // should still be waiting
1088 assertTrue(lock.isWaiting());
1089 verify(callback, never()).lockUnavailable(lock);
1091 // try again, via SCHEDULER - final retry fails
1093 assertTrue(lock.isUnavailable());
1095 // now callback should have been called
1096 verify(callback).lockUnavailable(lock);
1100 * Tests doRequest() when a non-transient DB exception is thrown.
1103 public void testDistributedLockDoRequestNotTransient() {
1105 * use a data source that throws a PERMANENT exception when getConnection() is
1108 feature = new InvalidDbLockingFeature(PERMANENT);
1110 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1112 // invoke doLock - should fail
1115 assertTrue(lock.isUnavailable());
1116 verify(callback).lockUnavailable(lock);
1118 // should not have scheduled anything new
1119 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1120 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1124 public void testDistributedLockDoLock() throws SQLException {
1125 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1127 // invoke doLock - should simply do an insert
1128 long tbegin = System.currentTimeMillis();
1131 assertEquals(1, getRecordCount());
1132 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1133 verify(callback).lockAvailable(lock);
1137 * Tests doLock() when the lock is freed before doLock runs.
1139 * @throws SQLException if an error occurs
1142 public void testDistributedLockDoLockFreed() throws SQLException {
1143 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1145 lock.setState(LockState.UNAVAILABLE);
1147 // invoke doLock - should do nothing
1150 assertEquals(0, getRecordCount());
1152 verify(callback, never()).lockAvailable(lock);
1156 * Tests doLock() when a DB exception is thrown.
1159 public void testDistributedLockDoLockEx() {
1160 // use a data source that throws an exception when getConnection() is called
1161 feature = new InvalidDbLockingFeature(PERMANENT);
1163 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1165 // invoke doLock - should simply do an insert
1168 // lock should have failed due to exception
1169 verify(callback).lockUnavailable(lock);
1173 * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1177 public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1178 // insert an expired record
1179 insertRecord(RESOURCE, feature.getUuidString(), 0);
1181 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1183 // invoke doLock - should simply do an update
1185 verify(callback).lockAvailable(lock);
1189 * Tests doLock() when a locked record already exists.
1192 public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1193 // insert an expired record
1194 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1196 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1201 // lock should have failed because it's already locked
1202 verify(callback).lockUnavailable(lock);
1206 public void testDistributedLockDoUnlock() throws SQLException {
1207 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1214 // invoke doUnlock()
1215 long tbegin = System.currentTimeMillis();
1218 assertEquals(0, getRecordCount());
1219 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1221 assertTrue(lock.isUnavailable());
1223 // no more callbacks should have occurred
1224 verify(callback, times(1)).lockAvailable(lock);
1225 verify(callback, never()).lockUnavailable(lock);
1229 * Tests doUnlock() when a DB exception is thrown.
1233 public void testDistributedLockDoUnlockEx() {
1234 feature = new InvalidDbLockingFeature(PERMANENT);
1236 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1238 // do NOT invoke doLock() - it will fail without a DB connection
1242 // invoke doUnlock()
1245 assertTrue(lock.isUnavailable());
1247 // no more callbacks should have occurred
1248 verify(callback, never()).lockAvailable(lock);
1249 verify(callback, never()).lockUnavailable(lock);
1253 public void testDistributedLockDoExtend() throws SQLException {
1254 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1257 LockCallback callback2 = mock(LockCallback.class);
1258 lock.extend(HOLD_SEC2, callback2);
1261 long tbegin = System.currentTimeMillis();
1264 assertEquals(1, getRecordCount());
1265 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1267 assertTrue(lock.isActive());
1269 // no more callbacks should have occurred
1270 verify(callback).lockAvailable(lock);
1271 verify(callback, never()).lockUnavailable(lock);
1273 // extension should have succeeded
1274 verify(callback2).lockAvailable(lock);
1275 verify(callback2, never()).lockUnavailable(lock);
1279 * Tests doExtend() when the lock is freed before doExtend runs.
1281 * @throws SQLException if an error occurs
1284 public void testDistributedLockDoExtendFreed() throws SQLException {
1285 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1286 lock.extend(HOLD_SEC2, callback);
1288 lock.setState(LockState.UNAVAILABLE);
1290 // invoke doExtend - should do nothing
1293 assertEquals(0, getRecordCount());
1295 verify(callback, never()).lockAvailable(lock);
1299 * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1302 * @throws SQLException if an error occurs
1305 public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1306 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1309 LockCallback callback2 = mock(LockCallback.class);
1310 lock.extend(HOLD_SEC2, callback2);
1312 // delete the record so it's forced to re-insert it
1316 long tbegin = System.currentTimeMillis();
1319 assertEquals(1, getRecordCount());
1320 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1322 assertTrue(lock.isActive());
1324 // no more callbacks should have occurred
1325 verify(callback).lockAvailable(lock);
1326 verify(callback, never()).lockUnavailable(lock);
1328 // extension should have succeeded
1329 verify(callback2).lockAvailable(lock);
1330 verify(callback2, never()).lockUnavailable(lock);
1334 * Tests doExtend() when both update and insert fail.
1338 public void testDistributedLockDoExtendNeitherSucceeds() {
1340 * this feature will create a lock that returns false when doDbUpdate() is
1341 * invoked, or when doDbInsert() is invoked a second time
1343 feature = new MyLockingFeature(true) {
1345 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1346 LockCallback callback) {
1347 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1348 private static final long serialVersionUID = 1L;
1349 private int ntimes = 0;
1352 protected boolean doDbInsert(Connection conn) throws SQLException {
1357 return super.doDbInsert(conn);
1361 protected boolean doDbUpdate(Connection conn) {
1368 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1371 LockCallback callback2 = mock(LockCallback.class);
1372 lock.extend(HOLD_SEC2, callback2);
1377 assertTrue(lock.isUnavailable());
1379 // no more callbacks should have occurred
1380 verify(callback).lockAvailable(lock);
1381 verify(callback, never()).lockUnavailable(lock);
1383 // extension should have failed
1384 verify(callback2, never()).lockAvailable(lock);
1385 verify(callback2).lockUnavailable(lock);
1389 * Tests doExtend() when an exception occurs.
1391 * @throws SQLException if an error occurs
1394 public void testDistributedLockDoExtendEx() throws SQLException {
1395 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1399 * delete the record and insert one with a different owner, which will cause
1400 * doDbInsert() to throw an exception
1403 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1405 LockCallback callback2 = mock(LockCallback.class);
1406 lock.extend(HOLD_SEC2, callback2);
1411 assertTrue(lock.isUnavailable());
1413 // no more callbacks should have occurred
1414 verify(callback).lockAvailable(lock);
1415 verify(callback, never()).lockUnavailable(lock);
1417 // extension should have failed
1418 verify(callback2, never()).lockAvailable(lock);
1419 verify(callback2).lockUnavailable(lock);
1423 public void testDistributedLockToString() {
1424 String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
1425 assertNotNull(text);
1426 assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1430 public void testMakeThreadPool() {
1431 // use a REAL feature to test this
1432 feature = new DistributedLockManager();
1434 // this should create a thread pool
1435 feature.beforeCreateLockManager(engine, new Properties());
1436 feature.afterStart(engine);
1438 assertThatCode(this::shutdownFeature).doesNotThrowAnyException();
1442 * Performs a multi-threaded test of the locking facility.
1444 * @throws InterruptedException if the current thread is interrupted while waiting for
1445 * the background threads to complete
1448 public void testMultiThreaded() throws InterruptedException {
1449 ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1451 feature = new DistributedLockManager();
1452 feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1453 feature.afterStart(PolicyEngineConstants.getManager());
1455 List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1456 for (int x = 0; x < MAX_THREADS; ++x) {
1457 threads.add(new MyThread());
1460 threads.forEach(Thread::start);
1462 for (MyThread thread : threads) {
1464 assertFalse(thread.isAlive());
1467 for (MyThread thread : threads) {
1468 if (thread.err != null) {
1473 assertTrue(nsuccesses.get() > 0);
1476 private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
1477 boolean waitForLock) {
1478 return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
1481 private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1482 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1483 try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1484 oos.writeObject(lock);
1487 ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1488 try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1489 return (DistributedLock) ois.readObject();
1494 * Runs the checkExpired() action.
1496 * @param nskip number of actions in the work queue to skip
1497 * @param nadditional number of additional actions that appear in the work queue
1498 * <i>after</i> the checkExpired action
1499 * @param schedSec number of seconds for which the checker should have been scheduled
1501 private void runChecker(int nskip, int nadditional, long schedSec) {
1502 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1503 verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1504 Runnable action = captor.getAllValues().get(nskip);
1509 * Runs a lock action (e.g., doLock, doUnlock).
1511 * @param nskip number of actions in the work queue to skip
1512 * @param nadditional number of additional actions that appear in the work queue
1513 * <i>after</i> the desired action
1515 void runLock(int nskip, int nadditional) {
1516 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1517 verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1519 Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1524 * Runs a scheduled action (e.g., "retry" action).
1526 * @param nskip number of actions in the work queue to skip
1527 * @param nadditional number of additional actions that appear in the work queue
1528 * <i>after</i> the desired action
1530 void runSchedule(int nskip, int nadditional) {
1531 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1532 verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
1534 Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1539 * Gets a count of the number of lock records in the DB.
1541 * @return the number of lock records in the DB
1542 * @throws SQLException if an error occurs accessing the DB
1544 private int getRecordCount() throws SQLException {
1545 try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1546 ResultSet result = stmt.executeQuery()) {
1548 if (result.next()) {
1549 return result.getInt(1);
1558 * Determines if there is a record for the given resource whose expiration time is in
1559 * the expected range.
1561 * @param resourceId ID of the resource of interest
1562 * @param uuidString UUID string of the owner
1563 * @param holdSec seconds for which the lock was to be held
1564 * @param tbegin earliest time, in milliseconds, at which the record could have been
1565 * inserted into the DB
1566 * @return {@code true} if a record is found, {@code false} otherwise
1567 * @throws SQLException if an error occurs accessing the DB
1569 private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1570 try (PreparedStatement stmt =
1571 conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1572 + " WHERE resourceId=? AND host=? AND owner=?")) {
1574 stmt.setString(1, resourceId);
1575 stmt.setString(2, feature.getPdpName());
1576 stmt.setString(3, uuidString);
1578 try (ResultSet result = stmt.executeQuery()) {
1579 if (result.next()) {
1580 int remaining = result.getInt(1);
1581 long maxDiff = System.currentTimeMillis() - tbegin;
1582 return (remaining >= 0 && holdSec - remaining <= maxDiff);
1592 * Inserts a record into the DB.
1594 * @param resourceId ID of the resource of interest
1595 * @param uuidString UUID string of the owner
1596 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1597 * @throws SQLException if an error occurs accessing the DB
1599 private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1600 this.insertRecord(resourceId, feature.getPdpName(), uuidString, expireOffset);
1603 private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1604 throws SQLException {
1605 try (PreparedStatement stmt =
1606 conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1607 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1609 stmt.setString(1, resourceId);
1610 stmt.setString(2, hostName);
1611 stmt.setString(3, uuidString);
1612 stmt.setInt(4, expireOffset);
1614 assertEquals(1, stmt.executeUpdate());
1619 * Updates a record in the DB.
1621 * @param resourceId ID of the resource of interest
1622 * @param newUuid UUID string of the <i>new</i> owner
1623 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1624 * @throws SQLException if an error occurs accessing the DB
1626 private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1627 try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1628 + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1630 stmt.setString(1, newHost);
1631 stmt.setString(2, newUuid);
1632 stmt.setInt(3, expireOffset);
1633 stmt.setString(4, resourceId);
1635 assertEquals(1, stmt.executeUpdate());
1640 * Feature that uses <i>exsvc</i> to execute requests.
1642 private class MyLockingFeature extends DistributedLockManager {
1644 public MyLockingFeature(boolean init) {
1647 exsvc = mock(ScheduledExecutorService.class);
1648 when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1649 ReflectionTestUtils.setField(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1652 beforeCreateLockManager(engine, new Properties());
1660 * Feature whose data source all throws exceptions.
1662 private class InvalidDbLockingFeature extends MyLockingFeature {
1663 private boolean isTransient;
1664 private boolean freeLock = false;
1666 public InvalidDbLockingFeature(boolean isTransient) {
1667 // pass "false" because we have to set the error code BEFORE calling
1671 this.isTransient = isTransient;
1673 this.beforeCreateLockManager(engine, new Properties());
1675 this.afterStart(engine);
1679 protected BasicDataSource makeDataSource() throws Exception {
1680 when(datasrc.getConnection()).thenAnswer(answer -> {
1689 doThrow(makeEx()).when(datasrc).close();
1694 protected SQLException makeEx() {
1696 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1699 return new SQLException(EXPECTED_EXCEPTION);
1705 * Feature whose locks free themselves while free() is already running.
1707 private class FreeWithFreeLockingFeature extends MyLockingFeature {
1708 private boolean relock;
1710 public FreeWithFreeLockingFeature(boolean relock) {
1712 this.relock = relock;
1716 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1717 LockCallback callback) {
1719 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1720 private static final long serialVersionUID = 1L;
1721 private boolean checked = false;
1724 public boolean isUnavailable() {
1726 return super.isUnavailable();
1731 // release and relock
1739 createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1749 * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
1750 * extend it, and then unlock it.
1752 private class MyThread extends Thread {
1753 AssertionError err = null;
1762 for (int x = 0; x < MAX_LOOPS; ++x) {
1766 } catch (AssertionError e) {
1771 private void makeAttempt() {
1773 Semaphore sem = new Semaphore(0);
1775 LockCallback cb = new LockCallback() {
1777 public void lockAvailable(Lock lock) {
1782 public void lockUnavailable(Lock lock) {
1787 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1789 // wait for callback, whether available or unavailable
1790 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1791 if (!lock.isActive()) {
1795 nsuccesses.incrementAndGet();
1797 assertEquals(1, nactive.incrementAndGet());
1799 lock.extend(HOLD_SEC2, cb);
1800 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1801 assertTrue(lock.isActive());
1803 // decrement BEFORE free()
1804 nactive.decrementAndGet();
1806 assertTrue(lock.free());
1807 assertTrue(lock.isUnavailable());
1809 } catch (InterruptedException e) {
1810 Thread.currentThread().interrupt();
1811 throw new AssertionError("interrupted", e);