2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.distributed.locking;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
26 import static org.assertj.core.api.Assertions.assertThatThrownBy;
27 import static org.junit.Assert.assertEquals;
28 import static org.junit.Assert.assertFalse;
29 import static org.junit.Assert.assertNotNull;
30 import static org.junit.Assert.assertNull;
31 import static org.junit.Assert.assertSame;
32 import static org.junit.Assert.assertTrue;
33 import static org.mockito.ArgumentMatchers.any;
34 import static org.mockito.ArgumentMatchers.anyBoolean;
35 import static org.mockito.ArgumentMatchers.anyLong;
36 import static org.mockito.ArgumentMatchers.eq;
37 import static org.mockito.Mockito.doThrow;
38 import static org.mockito.Mockito.mock;
39 import static org.mockito.Mockito.never;
40 import static org.mockito.Mockito.times;
41 import static org.mockito.Mockito.verify;
42 import static org.mockito.Mockito.when;
44 import java.io.ByteArrayInputStream;
45 import java.io.ByteArrayOutputStream;
46 import java.io.ObjectInputStream;
47 import java.io.ObjectOutputStream;
48 import java.sql.Connection;
49 import java.sql.DriverManager;
50 import java.sql.PreparedStatement;
51 import java.sql.ResultSet;
52 import java.sql.SQLException;
53 import java.sql.SQLTransientException;
54 import java.util.ArrayList;
55 import java.util.List;
56 import java.util.Properties;
57 import java.util.concurrent.Executors;
58 import java.util.concurrent.RejectedExecutionException;
59 import java.util.concurrent.ScheduledExecutorService;
60 import java.util.concurrent.ScheduledFuture;
61 import java.util.concurrent.Semaphore;
62 import java.util.concurrent.TimeUnit;
63 import java.util.concurrent.atomic.AtomicBoolean;
64 import java.util.concurrent.atomic.AtomicInteger;
65 import java.util.concurrent.atomic.AtomicReference;
66 import org.apache.commons.dbcp2.BasicDataSource;
67 import org.junit.After;
68 import org.junit.AfterClass;
69 import org.junit.Before;
70 import org.junit.BeforeClass;
71 import org.junit.Test;
72 import org.kie.api.runtime.KieSession;
73 import org.mockito.ArgumentCaptor;
74 import org.mockito.Mock;
75 import org.mockito.MockitoAnnotations;
76 import org.onap.policy.common.utils.services.OrderedServiceImpl;
77 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
78 import org.onap.policy.drools.core.PolicySession;
79 import org.onap.policy.drools.core.lock.Lock;
80 import org.onap.policy.drools.core.lock.LockCallback;
81 import org.onap.policy.drools.core.lock.LockState;
82 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
83 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
84 import org.onap.policy.drools.system.PolicyEngine;
85 import org.onap.policy.drools.system.PolicyEngineConstants;
86 import org.powermock.reflect.Whitebox;
88 public class DistributedLockManagerTest {
89 private static final long EXPIRE_SEC = 900L;
90 private static final long RETRY_SEC = 60L;
91 private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
92 private static final String OTHER_HOST = "other-host";
93 private static final String OTHER_OWNER = "other-owner";
94 private static final String EXPECTED_EXCEPTION = "expected exception";
95 private static final String DB_CONNECTION =
96 "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
97 private static final String DB_USER = "user";
98 private static final String DB_PASSWORD = "password";
99 private static final String OWNER_KEY = "my key";
100 private static final String RESOURCE = "my resource";
101 private static final String RESOURCE2 = "my resource #2";
102 private static final String RESOURCE3 = "my resource #3";
103 private static final String RESOURCE4 = "my resource #4";
104 private static final String RESOURCE5 = "my resource #5";
105 private static final int HOLD_SEC = 100;
106 private static final int HOLD_SEC2 = 120;
107 private static final int MAX_THREADS = 5;
108 private static final int MAX_LOOPS = 100;
109 private static final boolean TRANSIENT = true;
110 private static final boolean PERMANENT = false;
112 // number of execute() calls before the first lock attempt
113 private static final int PRE_LOCK_EXECS = 1;
115 // number of execute() calls before the first schedule attempt
116 private static final int PRE_SCHED_EXECS = 1;
118 private static Connection conn = null;
119 private static ScheduledExecutorService saveExec;
120 private static ScheduledExecutorService realExec;
123 private PolicyEngine engine;
126 private KieSession kieSess;
129 private ScheduledExecutorService exsvc;
132 private ScheduledFuture<?> checker;
135 private LockCallback callback;
138 private BasicDataSource datasrc;
140 private DistributedLock lock;
141 private PolicySession session;
143 private AtomicInteger nactive;
144 private AtomicInteger nsuccesses;
145 private DistributedLockManager feature;
149 * Configures the location of the property files and creates the DB.
151 * @throws SQLException if the DB cannot be created
154 public static void setUpBeforeClass() throws SQLException {
155 SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
157 conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
159 try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
160 + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
161 + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
162 createStmt.executeUpdate();
165 saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
167 realExec = Executors.newScheduledThreadPool(3);
171 * Restores static fields.
174 public static void tearDownAfterClass() throws SQLException {
175 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
181 * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
184 * @throws SQLException if the lock records cannot be deleted from the DB
187 public void setUp() throws SQLException {
188 MockitoAnnotations.initMocks(this);
190 // grant() and deny() calls will come through here and be immediately executed
191 session = new PolicySession(null, null, kieSess) {
193 public void insertDrools(Object object) {
194 ((Runnable) object).run();
198 session.setPolicySession();
200 nactive = new AtomicInteger(0);
201 nsuccesses = new AtomicInteger(0);
205 feature = new MyLockingFeature(true);
209 public void tearDown() throws SQLException {
214 private void cleanDb() throws SQLException {
215 try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
216 stmt.executeUpdate();
220 private void shutdownFeature() {
221 if (feature != null) {
222 feature.afterStop(engine);
228 * Tests that the feature is found in the expected service sets.
231 public void testServiceApis() {
232 assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
233 .anyMatch(obj -> obj instanceof DistributedLockManager));
237 public void testGetSequenceNumber() {
238 assertEquals(1000, feature.getSequenceNumber());
242 public void testBeforeCreateLockManager() {
243 assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
247 * Tests beforeCreate(), when getProperties() throws a runtime exception.
250 public void testBeforeCreateLockManagerEx() {
253 feature = new MyLockingFeature(false) {
255 protected Properties getProperties(String fileName) {
256 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
260 assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties()))
261 .isInstanceOf(DistributedLockManagerException.class);
265 public void testAfterStart() {
266 // verify that cleanup & expire check are both added to the queue
267 verify(exsvc).execute(any());
268 verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
272 * Tests afterStart(), when thread pool throws a runtime exception.
275 public void testAfterStartExInThreadPool() {
278 feature = new MyLockingFeature(false);
280 doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
282 assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
286 public void testDeleteExpiredDbLocks() throws SQLException {
287 // add records: two expired, one not
288 insertRecord(RESOURCE, feature.getUuidString(), -1);
289 insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
290 insertRecord(RESOURCE3, OTHER_OWNER, 0);
291 insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
293 // get the clean-up function and execute it
294 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
295 verify(exsvc).execute(captor.capture());
297 long tbegin = System.currentTimeMillis();
298 Runnable action = captor.getValue();
301 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
302 assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
303 assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
304 assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
306 assertEquals(2, getRecordCount());
310 * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
312 * @throws SQLException if an error occurs
315 public void testDeleteExpiredDbLocksEx() throws SQLException {
316 feature = new InvalidDbLockingFeature(TRANSIENT);
318 // get the clean-up function and execute it
319 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
320 verify(exsvc).execute(captor.capture());
322 Runnable action = captor.getValue();
324 // should not throw an exception
329 public void testAfterStop() {
331 verify(checker).cancel(anyBoolean());
333 feature = new DistributedLockManager();
335 // shutdown without calling afterStart()
341 * Tests afterStop(), when the data source throws an exception when close() is called.
343 * @throws SQLException if an error occurs
346 public void testAfterStopEx() throws SQLException {
349 // use a data source that throws an exception when closed
350 feature = new InvalidDbLockingFeature(TRANSIENT);
352 assertThatCode(() -> shutdownFeature()).doesNotThrowAnyException();
356 public void testCreateLock() throws SQLException {
357 verify(exsvc).execute(any());
359 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
360 assertTrue(lock.isWaiting());
362 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
364 // this lock should fail
365 LockCallback callback2 = mock(LockCallback.class);
366 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
367 assertTrue(lock2.isUnavailable());
368 verify(callback2, never()).lockAvailable(lock2);
369 verify(callback2).lockUnavailable(lock2);
371 // this should fail, too
372 LockCallback callback3 = mock(LockCallback.class);
373 DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
374 assertTrue(lock3.isUnavailable());
375 verify(callback3, never()).lockAvailable(lock3);
376 verify(callback3).lockUnavailable(lock3);
378 // no change to first
379 assertTrue(lock.isWaiting());
381 // no callbacks to the first lock
382 verify(callback, never()).lockAvailable(lock);
383 verify(callback, never()).lockUnavailable(lock);
385 assertTrue(lock.isWaiting());
386 assertEquals(0, getRecordCount());
389 assertTrue(lock.isActive());
390 assertEquals(1, getRecordCount());
392 verify(callback).lockAvailable(lock);
393 verify(callback, never()).lockUnavailable(lock);
395 // this should succeed
396 DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
397 assertTrue(lock4.isWaiting());
399 // after running checker, original records should still remain
400 runChecker(0, 0, EXPIRE_SEC);
401 assertEquals(1, getRecordCount());
402 verify(callback, never()).lockUnavailable(lock);
406 * Tests createLock() when the feature is not the latest instance.
409 public void testCreateLockNotLatestInstance() {
410 DistributedLockManager.setLatestInstance(null);
412 Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
413 assertTrue(lock.isUnavailable());
414 verify(callback, never()).lockAvailable(any());
415 verify(callback).lockUnavailable(lock);
419 public void testCheckExpired() throws SQLException {
420 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
423 LockCallback callback2 = mock(LockCallback.class);
424 final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
427 LockCallback callback3 = mock(LockCallback.class);
428 final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
431 LockCallback callback4 = mock(LockCallback.class);
432 final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
435 LockCallback callback5 = mock(LockCallback.class);
436 final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
439 assertEquals(5, getRecordCount());
442 updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
444 // change host of another record
445 updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
447 // change uuid of another record
448 updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC);
452 runChecker(0, 0, EXPIRE_SEC);
456 assertTrue(lock.isUnavailable());
457 assertTrue(lock2.isActive());
458 assertTrue(lock3.isUnavailable());
459 assertTrue(lock4.isActive());
460 assertTrue(lock5.isUnavailable());
466 verify(callback).lockUnavailable(lock);
467 verify(callback3).lockUnavailable(lock3);
468 verify(callback5).lockUnavailable(lock5);
470 verify(callback2, never()).lockUnavailable(lock2);
471 verify(callback4, never()).lockUnavailable(lock4);
474 // another check should have been scheduled, with the normal interval
475 runChecker(1, 0, EXPIRE_SEC);
479 * Tests checkExpired(), when schedule() throws an exception.
482 public void testCheckExpiredExecRejected() {
483 // arrange for execution to be rejected
484 when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
485 .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
487 runChecker(0, 0, EXPIRE_SEC);
491 * Tests checkExpired(), when getConnection() throws an exception.
494 public void testCheckExpiredSqlEx() {
495 // use a data source that throws an exception when getConnection() is called
496 feature = new InvalidDbLockingFeature(TRANSIENT);
498 runChecker(0, 0, EXPIRE_SEC);
500 // it should have scheduled another check, sooner
501 runChecker(0, 0, RETRY_SEC);
505 * Tests checkExpired(), when getConnection() throws an exception and the feature is
509 public void testCheckExpiredSqlExFeatureStopped() {
510 // use a data source that throws an exception when getConnection() is called
511 feature = new InvalidDbLockingFeature(TRANSIENT) {
513 protected SQLException makeEx() {
515 return super.makeEx();
519 runChecker(0, 0, EXPIRE_SEC);
521 // it should NOT have scheduled another check
522 verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
526 public void testExpireLocks() throws SQLException {
527 AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
529 feature = new MyLockingFeature(true) {
531 protected BasicDataSource makeDataSource() throws Exception {
532 // get the real data source
533 BasicDataSource src2 = super.makeDataSource();
535 when(datasrc.getConnection()).thenAnswer(answer -> {
536 DistributedLock lck = freeLock.getAndSet(null);
545 return src2.getConnection();
552 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
555 LockCallback callback2 = mock(LockCallback.class);
556 final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
559 LockCallback callback3 = mock(LockCallback.class);
560 final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
561 // don't run doLock for lock3 - leave it in the waiting state
563 LockCallback callback4 = mock(LockCallback.class);
564 final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
567 assertEquals(3, getRecordCount());
570 updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
572 // arrange to free lock4 while the checker is running
576 runChecker(0, 0, EXPIRE_SEC);
580 assertTrue(lock.isUnavailable());
581 assertTrue(lock2.isActive());
582 assertTrue(lock3.isWaiting());
583 assertTrue(lock4.isUnavailable());
586 verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
588 verify(callback).lockUnavailable(lock);
589 verify(callback2, never()).lockUnavailable(lock2);
590 verify(callback3, never()).lockUnavailable(lock3);
591 verify(callback4, never()).lockUnavailable(lock4);
595 public void testDistributedLockNoArgs() {
596 DistributedLock lock = new DistributedLock();
597 assertNull(lock.getResourceId());
598 assertNull(lock.getOwnerKey());
599 assertNull(lock.getCallback());
600 assertEquals(0, lock.getHoldSec());
604 public void testDistributedLock() {
605 assertThatIllegalArgumentException()
606 .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
607 .withMessageContaining("holdSec");
609 // should generate no exception
610 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
614 public void testDistributedLockSerializable() throws Exception {
615 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
616 lock = roundTrip(lock);
618 assertTrue(lock.isWaiting());
620 assertEquals(RESOURCE, lock.getResourceId());
621 assertEquals(OWNER_KEY, lock.getOwnerKey());
622 assertNull(lock.getCallback());
623 assertEquals(HOLD_SEC, lock.getHoldSec());
627 public void testGrant() {
628 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
629 assertFalse(lock.isActive());
631 // execute the doLock() call
634 assertTrue(lock.isActive());
636 // the callback for the lock should have been run in the foreground thread
637 verify(callback).lockAvailable(lock);
641 public void testDistributedLockDeny() {
643 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
645 // get another lock - should fail
646 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
648 assertTrue(lock.isUnavailable());
650 // the callback for the second lock should have been run in the foreground thread
651 verify(callback).lockUnavailable(lock);
653 // should only have a request for the first lock
654 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
658 public void testDistributedLockFree() {
659 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
661 assertTrue(lock.free());
662 assertTrue(lock.isUnavailable());
664 // run both requests associated with the lock
668 // should not have changed state
669 assertTrue(lock.isUnavailable());
671 // attempt to free it again
672 assertFalse(lock.free());
674 // should not have queued anything else
675 verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
677 // new lock should succeed
678 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
679 assertTrue(lock2 != lock);
680 assertTrue(lock2.isWaiting());
684 * Tests that free() works on a serialized lock with a new feature.
686 * @throws Exception if an error occurs
689 public void testDistributedLockFreeSerialized() throws Exception {
690 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
692 feature = new MyLockingFeature(true);
694 lock = roundTrip(lock);
695 assertTrue(lock.free());
696 assertTrue(lock.isUnavailable());
700 * Tests free() on a serialized lock without a feature.
702 * @throws Exception if an error occurs
705 public void testDistributedLockFreeNoFeature() throws Exception {
706 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
708 DistributedLockManager.setLatestInstance(null);
710 lock = roundTrip(lock);
711 assertFalse(lock.free());
712 assertTrue(lock.isUnavailable());
716 * Tests the case where the lock is freed and doUnlock called between the call to
717 * isUnavailable() and the call to compute().
720 public void testDistributedLockFreeUnlocked() {
721 feature = new FreeWithFreeLockingFeature(true);
723 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
725 assertFalse(lock.free());
726 assertTrue(lock.isUnavailable());
730 * Tests the case where the lock is freed, but doUnlock is not completed, between the
731 * call to isUnavailable() and the call to compute().
734 public void testDistributedLockFreeLockFreed() {
735 feature = new FreeWithFreeLockingFeature(false);
737 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
739 assertFalse(lock.free());
740 assertTrue(lock.isUnavailable());
744 public void testDistributedLockExtend() {
745 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
747 // lock2 should be denied - called back by this thread
748 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
749 verify(callback, never()).lockAvailable(lock2);
750 verify(callback).lockUnavailable(lock2);
752 // lock2 will still be denied - called back by this thread
753 lock2.extend(HOLD_SEC, callback);
754 verify(callback, times(2)).lockUnavailable(lock2);
756 // force lock2 to be active - should still be denied
757 Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
758 lock2.extend(HOLD_SEC, callback);
759 verify(callback, times(3)).lockUnavailable(lock2);
761 assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
762 .withMessageContaining("holdSec");
766 assertTrue(lock.isActive());
768 // now extend the first lock
769 LockCallback callback2 = mock(LockCallback.class);
770 lock.extend(HOLD_SEC2, callback2);
771 assertTrue(lock.isWaiting());
773 // execute doExtend()
775 lock.extend(HOLD_SEC2, callback2);
776 assertEquals(HOLD_SEC2, lock.getHoldSec());
777 verify(callback2).lockAvailable(lock);
778 verify(callback2, never()).lockUnavailable(lock);
782 * Tests that extend() works on a serialized lock with a new feature.
784 * @throws Exception if an error occurs
787 public void testDistributedLockExtendSerialized() throws Exception {
788 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
792 assertTrue(lock.isActive());
794 feature = new MyLockingFeature(true);
796 lock = roundTrip(lock);
797 assertTrue(lock.isActive());
799 LockCallback scallback = mock(LockCallback.class);
801 lock.extend(HOLD_SEC, scallback);
802 assertTrue(lock.isWaiting());
804 // run doExtend (in new feature)
806 assertTrue(lock.isActive());
808 verify(scallback).lockAvailable(lock);
809 verify(scallback, never()).lockUnavailable(lock);
813 * Tests extend() on a serialized lock without a feature.
815 * @throws Exception if an error occurs
818 public void testDistributedLockExtendNoFeature() throws Exception {
819 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
823 assertTrue(lock.isActive());
825 DistributedLockManager.setLatestInstance(null);
827 lock = roundTrip(lock);
828 assertTrue(lock.isActive());
830 LockCallback scallback = mock(LockCallback.class);
832 lock.extend(HOLD_SEC, scallback);
833 assertTrue(lock.isUnavailable());
835 verify(scallback, never()).lockAvailable(lock);
836 verify(scallback).lockUnavailable(lock);
840 * Tests the case where the lock is freed and doUnlock called between the call to
841 * isUnavailable() and the call to compute().
844 public void testDistributedLockExtendUnlocked() {
845 feature = new FreeWithFreeLockingFeature(true);
847 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
849 lock.extend(HOLD_SEC2, callback);
850 assertTrue(lock.isUnavailable());
851 verify(callback).lockUnavailable(lock);
855 * Tests the case where the lock is freed, but doUnlock is not completed, between the
856 * call to isUnavailable() and the call to compute().
859 public void testDistributedLockExtendLockFreed() {
860 feature = new FreeWithFreeLockingFeature(false);
862 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
864 lock.extend(HOLD_SEC2, callback);
865 assertTrue(lock.isUnavailable());
866 verify(callback).lockUnavailable(lock);
870 public void testDistributedLockScheduleRequest() {
871 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
874 verify(callback).lockAvailable(lock);
878 public void testDistributedLockRescheduleRequest() throws SQLException {
879 // use a data source that throws an exception when getConnection() is called
880 InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
883 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
885 // invoke doLock - should fail and reschedule
888 // should still be waiting
889 assertTrue(lock.isWaiting());
890 verify(callback, never()).lockUnavailable(lock);
892 // free the lock while doLock is executing
893 invfeat.freeLock = true;
895 // try scheduled request - should just invoke doUnlock
898 // should still be waiting
899 assertTrue(lock.isUnavailable());
900 verify(callback, never()).lockUnavailable(lock);
902 // should have scheduled a retry of doUnlock
903 verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
907 public void testDistributedLockGetNextRequest() {
908 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
911 * run doLock. This should cause getNextRequest() to be called twice, once with a
912 * request in the queue, and the second time with request=null.
918 * Tests getNextRequest(), where the same request is still in the queue the second
922 public void testDistributedLockGetNextRequestSameRequest() {
923 // force reschedule to be invoked
924 feature = new InvalidDbLockingFeature(TRANSIENT);
926 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
929 * run doLock. This should cause getNextRequest() to be called twice, once with a
930 * request in the queue, and the second time with the same request again.
934 verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
938 public void testDistributedLockDoRequest() throws SQLException {
939 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
941 assertTrue(lock.isWaiting());
943 // run doLock via doRequest
946 assertTrue(lock.isActive());
950 * Tests doRequest(), when doRequest() is already running within another thread.
953 public void testDistributedLockDoRequestBusy() {
955 * this feature will invoke a request in a background thread while it's being run
956 * in a foreground thread.
958 AtomicBoolean running = new AtomicBoolean(false);
959 AtomicBoolean returned = new AtomicBoolean(false);
961 feature = new MyLockingFeature(true) {
963 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
964 LockCallback callback) {
965 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
966 private static final long serialVersionUID = 1L;
969 protected boolean doDbInsert(Connection conn) throws SQLException {
971 // already inside the thread - don't recurse any further
972 return super.doDbInsert(conn);
977 Thread thread = new Thread(() -> {
978 // run doLock from within the new thread
981 thread.setDaemon(true);
984 // wait for the background thread to complete before continuing
987 } catch (InterruptedException ignore) {
988 Thread.currentThread().interrupt();
991 returned.set(!thread.isAlive());
993 return super.doDbInsert(conn);
999 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1004 assertTrue(returned.get());
1008 * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1010 * @throws SQLException if an error occurs
1013 public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1014 // throw run-time exception
1015 when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1017 // use a data source that throws an exception when getConnection() is called
1018 feature = new MyLockingFeature(true) {
1020 protected BasicDataSource makeDataSource() throws Exception {
1025 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1027 // invoke doLock - should NOT reschedule
1030 assertTrue(lock.isUnavailable());
1031 verify(callback).lockUnavailable(lock);
1033 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1037 * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1040 * @throws SQLException if an error occurs
1043 public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1044 // throw run-time exception
1045 when(datasrc.getConnection()).thenAnswer(answer -> {
1047 throw new IllegalStateException(EXPECTED_EXCEPTION);
1050 // use a data source that throws an exception when getConnection() is called
1051 feature = new MyLockingFeature(true) {
1053 protected BasicDataSource makeDataSource() throws Exception {
1058 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1060 // invoke doLock - should NOT reschedule
1063 assertTrue(lock.isUnavailable());
1064 verify(callback, never()).lockUnavailable(lock);
1066 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1070 * Tests doRequest() when the retry count gets exhausted.
1073 public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1074 // use a data source that throws an exception when getConnection() is called
1075 feature = new InvalidDbLockingFeature(TRANSIENT);
1077 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1079 // invoke doLock - should fail and reschedule
1082 // should still be waiting
1083 assertTrue(lock.isWaiting());
1084 verify(callback, never()).lockUnavailable(lock);
1086 // try again, via SCHEDULER - first retry fails
1089 // should still be waiting
1090 assertTrue(lock.isWaiting());
1091 verify(callback, never()).lockUnavailable(lock);
1093 // try again, via SCHEDULER - final retry fails
1095 assertTrue(lock.isUnavailable());
1097 // now callback should have been called
1098 verify(callback).lockUnavailable(lock);
1102 * Tests doRequest() when a non-transient DB exception is thrown.
1105 public void testDistributedLockDoRequestNotTransient() {
1107 * use a data source that throws a PERMANENT exception when getConnection() is
1110 feature = new InvalidDbLockingFeature(PERMANENT);
1112 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1114 // invoke doLock - should fail
1117 assertTrue(lock.isUnavailable());
1118 verify(callback).lockUnavailable(lock);
1120 // should not have scheduled anything new
1121 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1122 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1126 public void testDistributedLockDoLock() throws SQLException {
1127 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1129 // invoke doLock - should simply do an insert
1130 long tbegin = System.currentTimeMillis();
1133 assertEquals(1, getRecordCount());
1134 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1135 verify(callback).lockAvailable(lock);
1139 * Tests doLock() when the lock is freed before doLock runs.
1141 * @throws SQLException if an error occurs
1144 public void testDistributedLockDoLockFreed() throws SQLException {
1145 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1147 lock.setState(LockState.UNAVAILABLE);
1149 // invoke doLock - should do nothing
1152 assertEquals(0, getRecordCount());
1154 verify(callback, never()).lockAvailable(lock);
1158 * Tests doLock() when a DB exception is thrown.
1161 public void testDistributedLockDoLockEx() {
1162 // use a data source that throws an exception when getConnection() is called
1163 feature = new InvalidDbLockingFeature(PERMANENT);
1165 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1167 // invoke doLock - should simply do an insert
1170 // lock should have failed due to exception
1171 verify(callback).lockUnavailable(lock);
1175 * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1179 public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1180 // insert an expired record
1181 insertRecord(RESOURCE, feature.getUuidString(), 0);
1183 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1185 // invoke doLock - should simply do an update
1187 verify(callback).lockAvailable(lock);
1191 * Tests doLock() when a locked record already exists.
1194 public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1195 // insert an expired record
1196 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1198 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1203 // lock should have failed because it's already locked
1204 verify(callback).lockUnavailable(lock);
1208 public void testDistributedLockDoUnlock() throws SQLException {
1209 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1216 // invoke doUnlock()
1217 long tbegin = System.currentTimeMillis();
1220 assertEquals(0, getRecordCount());
1221 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1223 assertTrue(lock.isUnavailable());
1225 // no more callbacks should have occurred
1226 verify(callback, times(1)).lockAvailable(lock);
1227 verify(callback, never()).lockUnavailable(lock);
1231 * Tests doUnlock() when a DB exception is thrown.
1233 * @throws SQLException if an error occurs
1236 public void testDistributedLockDoUnlockEx() throws SQLException {
1237 feature = new InvalidDbLockingFeature(PERMANENT);
1239 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1241 // do NOT invoke doLock() - it will fail without a DB connection
1245 // invoke doUnlock()
1248 assertTrue(lock.isUnavailable());
1250 // no more callbacks should have occurred
1251 verify(callback, never()).lockAvailable(lock);
1252 verify(callback, never()).lockUnavailable(lock);
1256 public void testDistributedLockDoExtend() throws SQLException {
1257 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1260 LockCallback callback2 = mock(LockCallback.class);
1261 lock.extend(HOLD_SEC2, callback2);
1264 long tbegin = System.currentTimeMillis();
1267 assertEquals(1, getRecordCount());
1268 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1270 assertTrue(lock.isActive());
1272 // no more callbacks should have occurred
1273 verify(callback).lockAvailable(lock);
1274 verify(callback, never()).lockUnavailable(lock);
1276 // extension should have succeeded
1277 verify(callback2).lockAvailable(lock);
1278 verify(callback2, never()).lockUnavailable(lock);
1282 * Tests doExtend() when the lock is freed before doExtend runs.
1284 * @throws SQLException if an error occurs
1287 public void testDistributedLockDoExtendFreed() throws SQLException {
1288 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1289 lock.extend(HOLD_SEC2, callback);
1291 lock.setState(LockState.UNAVAILABLE);
1293 // invoke doExtend - should do nothing
1296 assertEquals(0, getRecordCount());
1298 verify(callback, never()).lockAvailable(lock);
1302 * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1305 * @throws SQLException if an error occurs
1308 public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1309 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1312 LockCallback callback2 = mock(LockCallback.class);
1313 lock.extend(HOLD_SEC2, callback2);
1315 // delete the record so it's forced to re-insert it
1319 long tbegin = System.currentTimeMillis();
1322 assertEquals(1, getRecordCount());
1323 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1325 assertTrue(lock.isActive());
1327 // no more callbacks should have occurred
1328 verify(callback).lockAvailable(lock);
1329 verify(callback, never()).lockUnavailable(lock);
1331 // extension should have succeeded
1332 verify(callback2).lockAvailable(lock);
1333 verify(callback2, never()).lockUnavailable(lock);
1337 * Tests doExtend() when both update and insert fail.
1339 * @throws SQLException if an error occurs
1342 public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
1344 * this feature will create a lock that returns false when doDbUpdate() is
1345 * invoked, or when doDbInsert() is invoked a second time
1347 feature = new MyLockingFeature(true) {
1349 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1350 LockCallback callback) {
1351 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1352 private static final long serialVersionUID = 1L;
1353 private int ntimes = 0;
1356 protected boolean doDbInsert(Connection conn) throws SQLException {
1361 return super.doDbInsert(conn);
1365 protected boolean doDbUpdate(Connection conn) {
1372 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1375 LockCallback callback2 = mock(LockCallback.class);
1376 lock.extend(HOLD_SEC2, callback2);
1381 assertTrue(lock.isUnavailable());
1383 // no more callbacks should have occurred
1384 verify(callback).lockAvailable(lock);
1385 verify(callback, never()).lockUnavailable(lock);
1387 // extension should have failed
1388 verify(callback2, never()).lockAvailable(lock);
1389 verify(callback2).lockUnavailable(lock);
1393 * Tests doExtend() when an exception occurs.
1395 * @throws SQLException if an error occurs
1398 public void testDistributedLockDoExtendEx() throws SQLException {
1399 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1403 * delete the record and insert one with a different owner, which will cause
1404 * doDbInsert() to throw an exception
1407 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1409 LockCallback callback2 = mock(LockCallback.class);
1410 lock.extend(HOLD_SEC2, callback2);
1415 assertTrue(lock.isUnavailable());
1417 // no more callbacks should have occurred
1418 verify(callback).lockAvailable(lock);
1419 verify(callback, never()).lockUnavailable(lock);
1421 // extension should have failed
1422 verify(callback2, never()).lockAvailable(lock);
1423 verify(callback2).lockUnavailable(lock);
1427 public void testDistributedLockToString() {
1428 String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
1429 assertNotNull(text);
1430 assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1434 public void testMakeThreadPool() {
1435 // use a REAL feature to test this
1436 feature = new DistributedLockManager();
1438 // this should create a thread pool
1439 feature.beforeCreateLockManager(engine, new Properties());
1440 feature.afterStart(engine);
1442 assertThatCode(() -> shutdownFeature()).doesNotThrowAnyException();
1446 * Performs a multi-threaded test of the locking facility.
1448 * @throws InterruptedException if the current thread is interrupted while waiting for
1449 * the background threads to complete
1452 public void testMultiThreaded() throws InterruptedException {
1453 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1455 feature = new DistributedLockManager();
1456 feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1457 feature.afterStart(PolicyEngineConstants.getManager());
1459 List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1460 for (int x = 0; x < MAX_THREADS; ++x) {
1461 threads.add(new MyThread());
1464 threads.forEach(Thread::start);
1466 for (MyThread thread : threads) {
1468 assertFalse(thread.isAlive());
1471 for (MyThread thread : threads) {
1472 if (thread.err != null) {
1477 assertTrue(nsuccesses.get() > 0);
1480 private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
1481 boolean waitForLock) {
1482 return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
1485 private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1486 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1487 try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1488 oos.writeObject(lock);
1491 ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1492 try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1493 return (DistributedLock) ois.readObject();
1498 * Runs the checkExpired() action.
1500 * @param nskip number of actions in the work queue to skip
1501 * @param nadditional number of additional actions that appear in the work queue
1502 * <i>after</i> the checkExpired action
1503 * @param schedSec number of seconds for which the checker should have been scheduled
1505 private void runChecker(int nskip, int nadditional, long schedSec) {
1506 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1507 verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1508 Runnable action = captor.getAllValues().get(nskip);
1513 * Runs a lock action (e.g., doLock, doUnlock).
1515 * @param nskip number of actions in the work queue to skip
1516 * @param nadditional number of additional actions that appear in the work queue
1517 * <i>after</i> the desired action
1519 void runLock(int nskip, int nadditional) {
1520 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1521 verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1523 Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1528 * Runs a scheduled action (e.g., "retry" action).
1530 * @param nskip number of actions in the work queue to skip
1531 * @param nadditional number of additional actions that appear in the work queue
1532 * <i>after</i> the desired action
1534 void runSchedule(int nskip, int nadditional) {
1535 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1536 verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
1538 Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1543 * Gets a count of the number of lock records in the DB.
1545 * @return the number of lock records in the DB
1546 * @throws SQLException if an error occurs accessing the DB
1548 private int getRecordCount() throws SQLException {
1549 try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1550 ResultSet result = stmt.executeQuery()) {
1552 if (result.next()) {
1553 return result.getInt(1);
1562 * Determines if there is a record for the given resource whose expiration time is in
1563 * the expected range.
1565 * @param resourceId ID of the resource of interest
1566 * @param uuidString UUID string of the owner
1567 * @param holdSec seconds for which the lock was to be held
1568 * @param tbegin earliest time, in milliseconds, at which the record could have been
1569 * inserted into the DB
1570 * @return {@code true} if a record is found, {@code false} otherwise
1571 * @throws SQLException if an error occurs accessing the DB
1573 private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1574 try (PreparedStatement stmt =
1575 conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1576 + " WHERE resourceId=? AND host=? AND owner=?")) {
1578 stmt.setString(1, resourceId);
1579 stmt.setString(2, feature.getHostName());
1580 stmt.setString(3, uuidString);
1582 try (ResultSet result = stmt.executeQuery()) {
1583 if (result.next()) {
1584 int remaining = result.getInt(1);
1585 long maxDiff = System.currentTimeMillis() - tbegin;
1586 return (remaining >= 0 && holdSec - remaining <= maxDiff);
1596 * Inserts a record into the DB.
1598 * @param resourceId ID of the resource of interest
1599 * @param uuidString UUID string of the owner
1600 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1601 * @throws SQLException if an error occurs accessing the DB
1603 private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1604 this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset);
1607 private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1608 throws SQLException {
1609 try (PreparedStatement stmt =
1610 conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1611 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1613 stmt.setString(1, resourceId);
1614 stmt.setString(2, hostName);
1615 stmt.setString(3, uuidString);
1616 stmt.setInt(4, expireOffset);
1618 assertEquals(1, stmt.executeUpdate());
1623 * Updates a record in the DB.
1625 * @param resourceId ID of the resource of interest
1626 * @param newUuid UUID string of the <i>new</i> owner
1627 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1628 * @throws SQLException if an error occurs accessing the DB
1630 private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1631 try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1632 + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1634 stmt.setString(1, newHost);
1635 stmt.setString(2, newUuid);
1636 stmt.setInt(3, expireOffset);
1637 stmt.setString(4, resourceId);
1639 assertEquals(1, stmt.executeUpdate());
1644 * Feature that uses <i>exsvc</i> to execute requests.
1646 private class MyLockingFeature extends DistributedLockManager {
1648 public MyLockingFeature(boolean init) {
1651 exsvc = mock(ScheduledExecutorService.class);
1652 when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1653 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1656 beforeCreateLockManager(engine, new Properties());
1664 * Feature whose data source all throws exceptions.
1666 private class InvalidDbLockingFeature extends MyLockingFeature {
1667 private boolean isTransient;
1668 private boolean freeLock = false;
1670 public InvalidDbLockingFeature(boolean isTransient) {
1671 // pass "false" because we have to set the error code BEFORE calling
1675 this.isTransient = isTransient;
1677 this.beforeCreateLockManager(engine, new Properties());
1679 this.afterStart(engine);
1683 protected BasicDataSource makeDataSource() throws Exception {
1684 when(datasrc.getConnection()).thenAnswer(answer -> {
1693 doThrow(makeEx()).when(datasrc).close();
1698 protected SQLException makeEx() {
1700 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1703 return new SQLException(EXPECTED_EXCEPTION);
1709 * Feature whose locks free themselves while free() is already running.
1711 private class FreeWithFreeLockingFeature extends MyLockingFeature {
1712 private boolean relock;
1714 public FreeWithFreeLockingFeature(boolean relock) {
1716 this.relock = relock;
1720 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1721 LockCallback callback) {
1723 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1724 private static final long serialVersionUID = 1L;
1725 private boolean checked = false;
1728 public boolean isUnavailable() {
1730 return super.isUnavailable();
1735 // release and relock
1743 createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1753 * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
1754 * extend it, and then unlock it.
1756 private class MyThread extends Thread {
1757 AssertionError err = null;
1766 for (int x = 0; x < MAX_LOOPS; ++x) {
1770 } catch (AssertionError e) {
1775 private void makeAttempt() {
1777 Semaphore sem = new Semaphore(0);
1779 LockCallback cb = new LockCallback() {
1781 public void lockAvailable(Lock lock) {
1786 public void lockUnavailable(Lock lock) {
1791 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1793 // wait for callback, whether available or unavailable
1794 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1795 if (!lock.isActive()) {
1799 nsuccesses.incrementAndGet();
1801 assertEquals(1, nactive.incrementAndGet());
1803 lock.extend(HOLD_SEC2, cb);
1804 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1805 assertTrue(lock.isActive());
1807 // decrement BEFORE free()
1808 nactive.decrementAndGet();
1810 assertTrue(lock.free());
1811 assertTrue(lock.isUnavailable());
1813 } catch (InterruptedException e) {
1814 Thread.currentThread().interrupt();
1815 throw new AssertionError("interrupted", e);