2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.distributed.locking;
23 import static org.assertj.core.api.Assertions.assertThat;
24 import static org.assertj.core.api.Assertions.assertThatCode;
25 import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
26 import static org.assertj.core.api.Assertions.assertThatThrownBy;
27 import static org.junit.Assert.assertEquals;
28 import static org.junit.Assert.assertFalse;
29 import static org.junit.Assert.assertNotNull;
30 import static org.junit.Assert.assertNotSame;
31 import static org.junit.Assert.assertNull;
32 import static org.junit.Assert.assertSame;
33 import static org.junit.Assert.assertTrue;
34 import static org.mockito.ArgumentMatchers.any;
35 import static org.mockito.ArgumentMatchers.anyBoolean;
36 import static org.mockito.ArgumentMatchers.anyLong;
37 import static org.mockito.ArgumentMatchers.eq;
38 import static org.mockito.Mockito.doThrow;
39 import static org.mockito.Mockito.mock;
40 import static org.mockito.Mockito.never;
41 import static org.mockito.Mockito.times;
42 import static org.mockito.Mockito.verify;
43 import static org.mockito.Mockito.when;
45 import java.io.ByteArrayInputStream;
46 import java.io.ByteArrayOutputStream;
47 import java.io.ObjectInputStream;
48 import java.io.ObjectOutputStream;
49 import java.sql.Connection;
50 import java.sql.DriverManager;
51 import java.sql.PreparedStatement;
52 import java.sql.ResultSet;
53 import java.sql.SQLException;
54 import java.sql.SQLTransientException;
55 import java.util.ArrayList;
56 import java.util.List;
57 import java.util.Properties;
58 import java.util.concurrent.Executors;
59 import java.util.concurrent.RejectedExecutionException;
60 import java.util.concurrent.ScheduledExecutorService;
61 import java.util.concurrent.ScheduledFuture;
62 import java.util.concurrent.Semaphore;
63 import java.util.concurrent.TimeUnit;
64 import java.util.concurrent.atomic.AtomicBoolean;
65 import java.util.concurrent.atomic.AtomicInteger;
66 import java.util.concurrent.atomic.AtomicReference;
67 import org.apache.commons.dbcp2.BasicDataSource;
68 import org.junit.After;
69 import org.junit.AfterClass;
70 import org.junit.Before;
71 import org.junit.BeforeClass;
72 import org.junit.Test;
73 import org.kie.api.runtime.KieSession;
74 import org.mockito.ArgumentCaptor;
75 import org.mockito.Mock;
76 import org.mockito.MockitoAnnotations;
77 import org.onap.policy.common.utils.services.OrderedServiceImpl;
78 import org.onap.policy.distributed.locking.DistributedLockManager.DistributedLock;
79 import org.onap.policy.drools.core.PolicySession;
80 import org.onap.policy.drools.core.lock.Lock;
81 import org.onap.policy.drools.core.lock.LockCallback;
82 import org.onap.policy.drools.core.lock.LockState;
83 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
84 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
85 import org.onap.policy.drools.system.PolicyEngine;
86 import org.onap.policy.drools.system.PolicyEngineConstants;
87 import org.powermock.reflect.Whitebox;
89 public class DistributedLockManagerTest {
90 private static final long EXPIRE_SEC = 900L;
91 private static final long RETRY_SEC = 60L;
92 private static final String POLICY_ENGINE_EXECUTOR_FIELD = "executorService";
93 private static final String OTHER_HOST = "other-host";
94 private static final String OTHER_OWNER = "other-owner";
95 private static final String EXPECTED_EXCEPTION = "expected exception";
96 private static final String DB_CONNECTION =
97 "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
98 private static final String DB_USER = "user";
99 private static final String DB_PASSWORD = "password";
100 private static final String OWNER_KEY = "my key";
101 private static final String RESOURCE = "my resource";
102 private static final String RESOURCE2 = "my resource #2";
103 private static final String RESOURCE3 = "my resource #3";
104 private static final String RESOURCE4 = "my resource #4";
105 private static final String RESOURCE5 = "my resource #5";
106 private static final int HOLD_SEC = 100;
107 private static final int HOLD_SEC2 = 120;
108 private static final int MAX_THREADS = 5;
109 private static final int MAX_LOOPS = 100;
110 private static final boolean TRANSIENT = true;
111 private static final boolean PERMANENT = false;
113 // number of execute() calls before the first lock attempt
114 private static final int PRE_LOCK_EXECS = 1;
116 // number of execute() calls before the first schedule attempt
117 private static final int PRE_SCHED_EXECS = 1;
119 private static Connection conn = null;
120 private static ScheduledExecutorService saveExec;
121 private static ScheduledExecutorService realExec;
124 private PolicyEngine engine;
127 private KieSession kieSess;
130 private ScheduledExecutorService exsvc;
133 private ScheduledFuture<?> checker;
136 private LockCallback callback;
139 private BasicDataSource datasrc;
141 private DistributedLock lock;
142 private PolicySession session;
144 private AtomicInteger nactive;
145 private AtomicInteger nsuccesses;
146 private DistributedLockManager feature;
150 * Configures the location of the property files and creates the DB.
152 * @throws SQLException if the DB cannot be created
155 public static void setUpBeforeClass() throws SQLException {
156 SystemPersistenceConstants.getManager().setConfigurationDir("src/test/resources");
158 conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
160 try (PreparedStatement createStmt = conn.prepareStatement("create table pooling.locks "
161 + "(resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), "
162 + "expirationTime TIMESTAMP DEFAULT 0, PRIMARY KEY (resourceId))")) {
163 createStmt.executeUpdate();
166 saveExec = Whitebox.getInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD);
168 realExec = Executors.newScheduledThreadPool(3);
172 * Restores static fields.
175 public static void tearDownAfterClass() throws SQLException {
176 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, saveExec);
182 * Initializes the mocks and creates a feature that uses {@link #exsvc} to execute
185 * @throws SQLException if the lock records cannot be deleted from the DB
188 public void setUp() throws SQLException {
189 MockitoAnnotations.initMocks(this);
191 // grant() and deny() calls will come through here and be immediately executed
192 session = new PolicySession(null, null, kieSess) {
194 public void insertDrools(Object object) {
195 ((Runnable) object).run();
199 session.setPolicySession();
201 nactive = new AtomicInteger(0);
202 nsuccesses = new AtomicInteger(0);
206 feature = new MyLockingFeature(true);
210 public void tearDown() throws SQLException {
215 private void cleanDb() throws SQLException {
216 try (PreparedStatement stmt = conn.prepareStatement("DELETE FROM pooling.locks")) {
217 stmt.executeUpdate();
221 private void shutdownFeature() {
222 if (feature != null) {
223 feature.afterStop(engine);
229 * Tests that the feature is found in the expected service sets.
232 public void testServiceApis() {
233 assertTrue(new OrderedServiceImpl<>(PolicyEngineFeatureApi.class).getList().stream()
234 .anyMatch(obj -> obj instanceof DistributedLockManager));
238 public void testGetSequenceNumber() {
239 assertEquals(1000, feature.getSequenceNumber());
243 public void testBeforeCreateLockManager() {
244 assertSame(feature, feature.beforeCreateLockManager(engine, new Properties()));
248 * Tests beforeCreate(), when getProperties() throws a runtime exception.
251 public void testBeforeCreateLockManagerEx() {
254 feature = new MyLockingFeature(false) {
256 protected Properties getProperties(String fileName) {
257 throw new IllegalArgumentException(EXPECTED_EXCEPTION);
261 assertThatThrownBy(() -> feature.beforeCreateLockManager(engine, new Properties()))
262 .isInstanceOf(DistributedLockManagerException.class);
266 public void testAfterStart() {
267 // verify that cleanup & expire check are both added to the queue
268 verify(exsvc).execute(any());
269 verify(exsvc).schedule(any(Runnable.class), anyLong(), any());
273 * Tests afterStart(), when thread pool throws a runtime exception.
276 public void testAfterStartExInThreadPool() {
279 feature = new MyLockingFeature(false);
281 doThrow(new IllegalArgumentException(EXPECTED_EXCEPTION)).when(exsvc).execute(any());
283 assertThatThrownBy(() -> feature.afterStart(engine)).isInstanceOf(DistributedLockManagerException.class);
287 public void testDeleteExpiredDbLocks() throws SQLException {
288 // add records: two expired, one not
289 insertRecord(RESOURCE, feature.getUuidString(), -1);
290 insertRecord(RESOURCE2, feature.getUuidString(), HOLD_SEC2);
291 insertRecord(RESOURCE3, OTHER_OWNER, 0);
292 insertRecord(RESOURCE4, OTHER_OWNER, HOLD_SEC);
294 // get the clean-up function and execute it
295 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
296 verify(exsvc).execute(captor.capture());
298 long tbegin = System.currentTimeMillis();
299 Runnable action = captor.getValue();
302 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
303 assertTrue(recordInRange(RESOURCE2, feature.getUuidString(), HOLD_SEC2, tbegin));
304 assertFalse(recordInRange(RESOURCE3, OTHER_OWNER, HOLD_SEC, tbegin));
305 assertTrue(recordInRange(RESOURCE4, OTHER_OWNER, HOLD_SEC, tbegin));
307 assertEquals(2, getRecordCount());
311 * Tests deleteExpiredDbLocks(), when getConnection() throws an exception.
313 * @throws SQLException if an error occurs
316 public void testDeleteExpiredDbLocksEx() throws SQLException {
317 feature = new InvalidDbLockingFeature(TRANSIENT);
319 // get the clean-up function and execute it
320 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
321 verify(exsvc).execute(captor.capture());
323 Runnable action = captor.getValue();
325 // should not throw an exception
330 public void testAfterStop() {
332 verify(checker).cancel(anyBoolean());
334 feature = new DistributedLockManager();
336 // shutdown without calling afterStart()
342 * Tests afterStop(), when the data source throws an exception when close() is called.
344 * @throws SQLException if an error occurs
347 public void testAfterStopEx() throws SQLException {
350 // use a data source that throws an exception when closed
351 feature = new InvalidDbLockingFeature(TRANSIENT);
353 assertThatCode(() -> shutdownFeature()).doesNotThrowAnyException();
357 public void testCreateLock() throws SQLException {
358 verify(exsvc).execute(any());
360 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
361 assertTrue(lock.isWaiting());
363 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
365 // this lock should fail
366 LockCallback callback2 = mock(LockCallback.class);
367 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback2, false);
368 assertTrue(lock2.isUnavailable());
369 verify(callback2, never()).lockAvailable(lock2);
370 verify(callback2).lockUnavailable(lock2);
372 // this should fail, too
373 LockCallback callback3 = mock(LockCallback.class);
374 DistributedLock lock3 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback3, false);
375 assertTrue(lock3.isUnavailable());
376 verify(callback3, never()).lockAvailable(lock3);
377 verify(callback3).lockUnavailable(lock3);
379 // no change to first
380 assertTrue(lock.isWaiting());
382 // no callbacks to the first lock
383 verify(callback, never()).lockAvailable(lock);
384 verify(callback, never()).lockUnavailable(lock);
386 assertTrue(lock.isWaiting());
387 assertEquals(0, getRecordCount());
390 assertTrue(lock.isActive());
391 assertEquals(1, getRecordCount());
393 verify(callback).lockAvailable(lock);
394 verify(callback, never()).lockUnavailable(lock);
396 // this should succeed
397 DistributedLock lock4 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback, false);
398 assertTrue(lock4.isWaiting());
400 // after running checker, original records should still remain
401 runChecker(0, 0, EXPIRE_SEC);
402 assertEquals(1, getRecordCount());
403 verify(callback, never()).lockUnavailable(lock);
407 * Tests createLock() when the feature is not the latest instance.
410 public void testCreateLockNotLatestInstance() {
411 DistributedLockManager.setLatestInstance(null);
413 Lock lock = feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
414 assertTrue(lock.isUnavailable());
415 verify(callback, never()).lockAvailable(any());
416 verify(callback).lockUnavailable(lock);
420 public void testCheckExpired() throws SQLException {
421 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
424 LockCallback callback2 = mock(LockCallback.class);
425 final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
428 LockCallback callback3 = mock(LockCallback.class);
429 final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
432 LockCallback callback4 = mock(LockCallback.class);
433 final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
436 LockCallback callback5 = mock(LockCallback.class);
437 final DistributedLock lock5 = getLock(RESOURCE5, OWNER_KEY, HOLD_SEC, callback5, false);
440 assertEquals(5, getRecordCount());
443 updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
445 // change host of another record
446 updateRecord(RESOURCE3, OTHER_HOST, feature.getUuidString(), HOLD_SEC);
448 // change uuid of another record
449 updateRecord(RESOURCE5, feature.getHostName(), OTHER_OWNER, HOLD_SEC);
453 runChecker(0, 0, EXPIRE_SEC);
457 assertTrue(lock.isUnavailable());
458 assertTrue(lock2.isActive());
459 assertTrue(lock3.isUnavailable());
460 assertTrue(lock4.isActive());
461 assertTrue(lock5.isUnavailable());
467 verify(callback).lockUnavailable(lock);
468 verify(callback3).lockUnavailable(lock3);
469 verify(callback5).lockUnavailable(lock5);
471 verify(callback2, never()).lockUnavailable(lock2);
472 verify(callback4, never()).lockUnavailable(lock4);
475 // another check should have been scheduled, with the normal interval
476 runChecker(1, 0, EXPIRE_SEC);
480 * Tests checkExpired(), when schedule() throws an exception.
483 public void testCheckExpiredExecRejected() {
484 // arrange for execution to be rejected
485 when(exsvc.schedule(any(Runnable.class), anyLong(), any()))
486 .thenThrow(new RejectedExecutionException(EXPECTED_EXCEPTION));
488 runChecker(0, 0, EXPIRE_SEC);
492 * Tests checkExpired(), when getConnection() throws an exception.
495 public void testCheckExpiredSqlEx() {
496 // use a data source that throws an exception when getConnection() is called
497 feature = new InvalidDbLockingFeature(TRANSIENT);
499 runChecker(0, 0, EXPIRE_SEC);
501 // it should have scheduled another check, sooner
502 runChecker(0, 0, RETRY_SEC);
506 * Tests checkExpired(), when getConnection() throws an exception and the feature is
510 public void testCheckExpiredSqlExFeatureStopped() {
511 // use a data source that throws an exception when getConnection() is called
512 feature = new InvalidDbLockingFeature(TRANSIENT) {
514 protected SQLException makeEx() {
516 return super.makeEx();
520 runChecker(0, 0, EXPIRE_SEC);
522 // it should NOT have scheduled another check
523 verify(exsvc, times(1)).schedule(any(Runnable.class), anyLong(), any());
527 public void testExpireLocks() throws SQLException {
528 AtomicReference<DistributedLock> freeLock = new AtomicReference<>(null);
530 feature = new MyLockingFeature(true) {
532 protected BasicDataSource makeDataSource() throws Exception {
533 // get the real data source
534 BasicDataSource src2 = super.makeDataSource();
536 when(datasrc.getConnection()).thenAnswer(answer -> {
537 DistributedLock lck = freeLock.getAndSet(null);
546 return src2.getConnection();
553 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
556 LockCallback callback2 = mock(LockCallback.class);
557 final DistributedLock lock2 = getLock(RESOURCE2, OWNER_KEY, HOLD_SEC, callback2, false);
560 LockCallback callback3 = mock(LockCallback.class);
561 final DistributedLock lock3 = getLock(RESOURCE3, OWNER_KEY, HOLD_SEC, callback3, false);
562 // don't run doLock for lock3 - leave it in the waiting state
564 LockCallback callback4 = mock(LockCallback.class);
565 final DistributedLock lock4 = getLock(RESOURCE4, OWNER_KEY, HOLD_SEC, callback4, false);
568 assertEquals(3, getRecordCount());
571 updateRecord(RESOURCE, feature.getHostName(), feature.getUuidString(), -1);
573 // arrange to free lock4 while the checker is running
577 runChecker(0, 0, EXPIRE_SEC);
581 assertTrue(lock.isUnavailable());
582 assertTrue(lock2.isActive());
583 assertTrue(lock3.isWaiting());
584 assertTrue(lock4.isUnavailable());
587 verify(exsvc, times(PRE_LOCK_EXECS + 5)).execute(any());
589 verify(callback).lockUnavailable(lock);
590 verify(callback2, never()).lockUnavailable(lock2);
591 verify(callback3, never()).lockUnavailable(lock3);
592 verify(callback4, never()).lockUnavailable(lock4);
596 public void testDistributedLockNoArgs() {
597 DistributedLock lock = new DistributedLock();
598 assertNull(lock.getResourceId());
599 assertNull(lock.getOwnerKey());
600 assertNull(lock.getCallback());
601 assertEquals(0, lock.getHoldSec());
605 public void testDistributedLock() {
606 assertThatIllegalArgumentException()
607 .isThrownBy(() -> feature.createLock(RESOURCE, OWNER_KEY, -1, callback, false))
608 .withMessageContaining("holdSec");
610 // should generate no exception
611 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
615 public void testDistributedLockSerializable() throws Exception {
616 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
617 lock = roundTrip(lock);
619 assertTrue(lock.isWaiting());
621 assertEquals(RESOURCE, lock.getResourceId());
622 assertEquals(OWNER_KEY, lock.getOwnerKey());
623 assertNull(lock.getCallback());
624 assertEquals(HOLD_SEC, lock.getHoldSec());
628 public void testGrant() {
629 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
630 assertFalse(lock.isActive());
632 // execute the doLock() call
635 assertTrue(lock.isActive());
637 // the callback for the lock should have been run in the foreground thread
638 verify(callback).lockAvailable(lock);
642 public void testDistributedLockDeny() {
644 feature.createLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
646 // get another lock - should fail
647 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
649 assertTrue(lock.isUnavailable());
651 // the callback for the second lock should have been run in the foreground thread
652 verify(callback).lockUnavailable(lock);
654 // should only have a request for the first lock
655 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
659 public void testDistributedLockFree() {
660 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
662 assertTrue(lock.free());
663 assertTrue(lock.isUnavailable());
665 // run both requests associated with the lock
669 // should not have changed state
670 assertTrue(lock.isUnavailable());
672 // attempt to free it again
673 assertFalse(lock.free());
675 // should not have queued anything else
676 verify(exsvc, times(PRE_LOCK_EXECS + 2)).execute(any());
678 // new lock should succeed
679 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
680 assertNotSame(lock2, lock);
681 assertTrue(lock2.isWaiting());
685 * Tests that free() works on a serialized lock with a new feature.
687 * @throws Exception if an error occurs
690 public void testDistributedLockFreeSerialized() throws Exception {
691 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
693 feature = new MyLockingFeature(true);
695 lock = roundTrip(lock);
696 assertTrue(lock.free());
697 assertTrue(lock.isUnavailable());
701 * Tests free() on a serialized lock without a feature.
703 * @throws Exception if an error occurs
706 public void testDistributedLockFreeNoFeature() throws Exception {
707 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
709 DistributedLockManager.setLatestInstance(null);
711 lock = roundTrip(lock);
712 assertFalse(lock.free());
713 assertTrue(lock.isUnavailable());
717 * Tests the case where the lock is freed and doUnlock called between the call to
718 * isUnavailable() and the call to compute().
721 public void testDistributedLockFreeUnlocked() {
722 feature = new FreeWithFreeLockingFeature(true);
724 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
726 assertFalse(lock.free());
727 assertTrue(lock.isUnavailable());
731 * Tests the case where the lock is freed, but doUnlock is not completed, between the
732 * call to isUnavailable() and the call to compute().
735 public void testDistributedLockFreeLockFreed() {
736 feature = new FreeWithFreeLockingFeature(false);
738 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
740 assertFalse(lock.free());
741 assertTrue(lock.isUnavailable());
745 public void testDistributedLockExtend() {
746 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
748 // lock2 should be denied - called back by this thread
749 DistributedLock lock2 = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
750 verify(callback, never()).lockAvailable(lock2);
751 verify(callback).lockUnavailable(lock2);
753 // lock2 will still be denied - called back by this thread
754 lock2.extend(HOLD_SEC, callback);
755 verify(callback, times(2)).lockUnavailable(lock2);
757 // force lock2 to be active - should still be denied
758 Whitebox.setInternalState(lock2, "state", LockState.ACTIVE);
759 lock2.extend(HOLD_SEC, callback);
760 verify(callback, times(3)).lockUnavailable(lock2);
762 assertThatIllegalArgumentException().isThrownBy(() -> lock.extend(-1, callback))
763 .withMessageContaining("holdSec");
767 assertTrue(lock.isActive());
769 // now extend the first lock
770 LockCallback callback2 = mock(LockCallback.class);
771 lock.extend(HOLD_SEC2, callback2);
772 assertTrue(lock.isWaiting());
774 // execute doExtend()
776 lock.extend(HOLD_SEC2, callback2);
777 assertEquals(HOLD_SEC2, lock.getHoldSec());
778 verify(callback2).lockAvailable(lock);
779 verify(callback2, never()).lockUnavailable(lock);
783 * Tests that extend() works on a serialized lock with a new feature.
785 * @throws Exception if an error occurs
788 public void testDistributedLockExtendSerialized() throws Exception {
789 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
793 assertTrue(lock.isActive());
795 feature = new MyLockingFeature(true);
797 lock = roundTrip(lock);
798 assertTrue(lock.isActive());
800 LockCallback scallback = mock(LockCallback.class);
802 lock.extend(HOLD_SEC, scallback);
803 assertTrue(lock.isWaiting());
805 // run doExtend (in new feature)
807 assertTrue(lock.isActive());
809 verify(scallback).lockAvailable(lock);
810 verify(scallback, never()).lockUnavailable(lock);
814 * Tests extend() on a serialized lock without a feature.
816 * @throws Exception if an error occurs
819 public void testDistributedLockExtendNoFeature() throws Exception {
820 DistributedLock lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
824 assertTrue(lock.isActive());
826 DistributedLockManager.setLatestInstance(null);
828 lock = roundTrip(lock);
829 assertTrue(lock.isActive());
831 LockCallback scallback = mock(LockCallback.class);
833 lock.extend(HOLD_SEC, scallback);
834 assertTrue(lock.isUnavailable());
836 verify(scallback, never()).lockAvailable(lock);
837 verify(scallback).lockUnavailable(lock);
841 * Tests the case where the lock is freed and doUnlock called between the call to
842 * isUnavailable() and the call to compute().
845 public void testDistributedLockExtendUnlocked() {
846 feature = new FreeWithFreeLockingFeature(true);
848 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
850 lock.extend(HOLD_SEC2, callback);
851 assertTrue(lock.isUnavailable());
852 verify(callback).lockUnavailable(lock);
856 * Tests the case where the lock is freed, but doUnlock is not completed, between the
857 * call to isUnavailable() and the call to compute().
860 public void testDistributedLockExtendLockFreed() {
861 feature = new FreeWithFreeLockingFeature(false);
863 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
865 lock.extend(HOLD_SEC2, callback);
866 assertTrue(lock.isUnavailable());
867 verify(callback).lockUnavailable(lock);
871 public void testDistributedLockScheduleRequest() {
872 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
875 verify(callback).lockAvailable(lock);
879 public void testDistributedLockRescheduleRequest() throws SQLException {
880 // use a data source that throws an exception when getConnection() is called
881 InvalidDbLockingFeature invfeat = new InvalidDbLockingFeature(TRANSIENT);
884 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
886 // invoke doLock - should fail and reschedule
889 // should still be waiting
890 assertTrue(lock.isWaiting());
891 verify(callback, never()).lockUnavailable(lock);
893 // free the lock while doLock is executing
894 invfeat.freeLock = true;
896 // try scheduled request - should just invoke doUnlock
899 // should still be waiting
900 assertTrue(lock.isUnavailable());
901 verify(callback, never()).lockUnavailable(lock);
903 // should have scheduled a retry of doUnlock
904 verify(exsvc, times(PRE_SCHED_EXECS + 2)).schedule(any(Runnable.class), anyLong(), any());
908 public void testDistributedLockGetNextRequest() {
909 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
912 * run doLock. This should cause getNextRequest() to be called twice, once with a
913 * request in the queue, and the second time with request=null.
919 * Tests getNextRequest(), where the same request is still in the queue the second
923 public void testDistributedLockGetNextRequestSameRequest() {
924 // force reschedule to be invoked
925 feature = new InvalidDbLockingFeature(TRANSIENT);
927 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
930 * run doLock. This should cause getNextRequest() to be called twice, once with a
931 * request in the queue, and the second time with the same request again.
935 verify(exsvc, times(PRE_SCHED_EXECS + 1)).schedule(any(Runnable.class), anyLong(), any());
939 public void testDistributedLockDoRequest() throws SQLException {
940 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
942 assertTrue(lock.isWaiting());
944 // run doLock via doRequest
947 assertTrue(lock.isActive());
951 * Tests doRequest(), when doRequest() is already running within another thread.
954 public void testDistributedLockDoRequestBusy() {
956 * this feature will invoke a request in a background thread while it's being run
957 * in a foreground thread.
959 AtomicBoolean running = new AtomicBoolean(false);
960 AtomicBoolean returned = new AtomicBoolean(false);
962 feature = new MyLockingFeature(true) {
964 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
965 LockCallback callback) {
966 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
967 private static final long serialVersionUID = 1L;
970 protected boolean doDbInsert(Connection conn) throws SQLException {
972 // already inside the thread - don't recurse any further
973 return super.doDbInsert(conn);
978 Thread thread = new Thread(() -> {
979 // run doLock from within the new thread
982 thread.setDaemon(true);
985 // wait for the background thread to complete before continuing
988 } catch (InterruptedException ignore) {
989 Thread.currentThread().interrupt();
992 returned.set(!thread.isAlive());
994 return super.doDbInsert(conn);
1000 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1005 assertTrue(returned.get());
1009 * Tests doRequest() when an exception occurs while the lock is in the WAITING state.
1011 * @throws SQLException if an error occurs
1014 public void testDistributedLockDoRequestRunExWaiting() throws SQLException {
1015 // throw run-time exception
1016 when(datasrc.getConnection()).thenThrow(new IllegalStateException(EXPECTED_EXCEPTION));
1018 // use a data source that throws an exception when getConnection() is called
1019 feature = new MyLockingFeature(true) {
1021 protected BasicDataSource makeDataSource() throws Exception {
1026 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1028 // invoke doLock - should NOT reschedule
1031 assertTrue(lock.isUnavailable());
1032 verify(callback).lockUnavailable(lock);
1034 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1038 * Tests doRequest() when an exception occurs while the lock is in the UNAVAILABLE
1041 * @throws SQLException if an error occurs
1044 public void testDistributedLockDoRequestRunExUnavailable() throws SQLException {
1045 // throw run-time exception
1046 when(datasrc.getConnection()).thenAnswer(answer -> {
1048 throw new IllegalStateException(EXPECTED_EXCEPTION);
1051 // use a data source that throws an exception when getConnection() is called
1052 feature = new MyLockingFeature(true) {
1054 protected BasicDataSource makeDataSource() throws Exception {
1059 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1061 // invoke doLock - should NOT reschedule
1064 assertTrue(lock.isUnavailable());
1065 verify(callback, never()).lockUnavailable(lock);
1067 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1071 * Tests doRequest() when the retry count gets exhausted.
1074 public void testDistributedLockDoRequestRetriesExhaustedWhileLocking() {
1075 // use a data source that throws an exception when getConnection() is called
1076 feature = new InvalidDbLockingFeature(TRANSIENT);
1078 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1080 // invoke doLock - should fail and reschedule
1083 // should still be waiting
1084 assertTrue(lock.isWaiting());
1085 verify(callback, never()).lockUnavailable(lock);
1087 // try again, via SCHEDULER - first retry fails
1090 // should still be waiting
1091 assertTrue(lock.isWaiting());
1092 verify(callback, never()).lockUnavailable(lock);
1094 // try again, via SCHEDULER - final retry fails
1096 assertTrue(lock.isUnavailable());
1098 // now callback should have been called
1099 verify(callback).lockUnavailable(lock);
1103 * Tests doRequest() when a non-transient DB exception is thrown.
1106 public void testDistributedLockDoRequestNotTransient() {
1108 * use a data source that throws a PERMANENT exception when getConnection() is
1111 feature = new InvalidDbLockingFeature(PERMANENT);
1113 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1115 // invoke doLock - should fail
1118 assertTrue(lock.isUnavailable());
1119 verify(callback).lockUnavailable(lock);
1121 // should not have scheduled anything new
1122 verify(exsvc, times(PRE_LOCK_EXECS + 1)).execute(any());
1123 verify(exsvc, times(PRE_SCHED_EXECS)).schedule(any(Runnable.class), anyLong(), any());
1127 public void testDistributedLockDoLock() throws SQLException {
1128 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1130 // invoke doLock - should simply do an insert
1131 long tbegin = System.currentTimeMillis();
1134 assertEquals(1, getRecordCount());
1135 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1136 verify(callback).lockAvailable(lock);
1140 * Tests doLock() when the lock is freed before doLock runs.
1142 * @throws SQLException if an error occurs
1145 public void testDistributedLockDoLockFreed() throws SQLException {
1146 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1148 lock.setState(LockState.UNAVAILABLE);
1150 // invoke doLock - should do nothing
1153 assertEquals(0, getRecordCount());
1155 verify(callback, never()).lockAvailable(lock);
1159 * Tests doLock() when a DB exception is thrown.
1162 public void testDistributedLockDoLockEx() {
1163 // use a data source that throws an exception when getConnection() is called
1164 feature = new InvalidDbLockingFeature(PERMANENT);
1166 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1168 // invoke doLock - should simply do an insert
1171 // lock should have failed due to exception
1172 verify(callback).lockUnavailable(lock);
1176 * Tests doLock() when an (expired) record already exists, thus requiring doUpdate()
1180 public void testDistributedLockDoLockNeedingUpdate() throws SQLException {
1181 // insert an expired record
1182 insertRecord(RESOURCE, feature.getUuidString(), 0);
1184 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1186 // invoke doLock - should simply do an update
1188 verify(callback).lockAvailable(lock);
1192 * Tests doLock() when a locked record already exists.
1195 public void testDistributedLockDoLockAlreadyLocked() throws SQLException {
1196 // insert an expired record
1197 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1199 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1204 // lock should have failed because it's already locked
1205 verify(callback).lockUnavailable(lock);
1209 public void testDistributedLockDoUnlock() throws SQLException {
1210 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1217 // invoke doUnlock()
1218 long tbegin = System.currentTimeMillis();
1221 assertEquals(0, getRecordCount());
1222 assertFalse(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC, tbegin));
1224 assertTrue(lock.isUnavailable());
1226 // no more callbacks should have occurred
1227 verify(callback, times(1)).lockAvailable(lock);
1228 verify(callback, never()).lockUnavailable(lock);
1232 * Tests doUnlock() when a DB exception is thrown.
1234 * @throws SQLException if an error occurs
1237 public void testDistributedLockDoUnlockEx() throws SQLException {
1238 feature = new InvalidDbLockingFeature(PERMANENT);
1240 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1242 // do NOT invoke doLock() - it will fail without a DB connection
1246 // invoke doUnlock()
1249 assertTrue(lock.isUnavailable());
1251 // no more callbacks should have occurred
1252 verify(callback, never()).lockAvailable(lock);
1253 verify(callback, never()).lockUnavailable(lock);
1257 public void testDistributedLockDoExtend() throws SQLException {
1258 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1261 LockCallback callback2 = mock(LockCallback.class);
1262 lock.extend(HOLD_SEC2, callback2);
1265 long tbegin = System.currentTimeMillis();
1268 assertEquals(1, getRecordCount());
1269 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1271 assertTrue(lock.isActive());
1273 // no more callbacks should have occurred
1274 verify(callback).lockAvailable(lock);
1275 verify(callback, never()).lockUnavailable(lock);
1277 // extension should have succeeded
1278 verify(callback2).lockAvailable(lock);
1279 verify(callback2, never()).lockUnavailable(lock);
1283 * Tests doExtend() when the lock is freed before doExtend runs.
1285 * @throws SQLException if an error occurs
1288 public void testDistributedLockDoExtendFreed() throws SQLException {
1289 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1290 lock.extend(HOLD_SEC2, callback);
1292 lock.setState(LockState.UNAVAILABLE);
1294 // invoke doExtend - should do nothing
1297 assertEquals(0, getRecordCount());
1299 verify(callback, never()).lockAvailable(lock);
1303 * Tests doExtend() when the lock record is missing from the DB, thus requiring an
1306 * @throws SQLException if an error occurs
1309 public void testDistributedLockDoExtendInsertNeeded() throws SQLException {
1310 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1313 LockCallback callback2 = mock(LockCallback.class);
1314 lock.extend(HOLD_SEC2, callback2);
1316 // delete the record so it's forced to re-insert it
1320 long tbegin = System.currentTimeMillis();
1323 assertEquals(1, getRecordCount());
1324 assertTrue(recordInRange(RESOURCE, feature.getUuidString(), HOLD_SEC2, tbegin));
1326 assertTrue(lock.isActive());
1328 // no more callbacks should have occurred
1329 verify(callback).lockAvailable(lock);
1330 verify(callback, never()).lockUnavailable(lock);
1332 // extension should have succeeded
1333 verify(callback2).lockAvailable(lock);
1334 verify(callback2, never()).lockUnavailable(lock);
1338 * Tests doExtend() when both update and insert fail.
1340 * @throws SQLException if an error occurs
1343 public void testDistributedLockDoExtendNeitherSucceeds() throws SQLException {
1345 * this feature will create a lock that returns false when doDbUpdate() is
1346 * invoked, or when doDbInsert() is invoked a second time
1348 feature = new MyLockingFeature(true) {
1350 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1351 LockCallback callback) {
1352 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1353 private static final long serialVersionUID = 1L;
1354 private int ntimes = 0;
1357 protected boolean doDbInsert(Connection conn) throws SQLException {
1362 return super.doDbInsert(conn);
1366 protected boolean doDbUpdate(Connection conn) {
1373 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1376 LockCallback callback2 = mock(LockCallback.class);
1377 lock.extend(HOLD_SEC2, callback2);
1382 assertTrue(lock.isUnavailable());
1384 // no more callbacks should have occurred
1385 verify(callback).lockAvailable(lock);
1386 verify(callback, never()).lockUnavailable(lock);
1388 // extension should have failed
1389 verify(callback2, never()).lockAvailable(lock);
1390 verify(callback2).lockUnavailable(lock);
1394 * Tests doExtend() when an exception occurs.
1396 * @throws SQLException if an error occurs
1399 public void testDistributedLockDoExtendEx() throws SQLException {
1400 lock = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false);
1404 * delete the record and insert one with a different owner, which will cause
1405 * doDbInsert() to throw an exception
1408 insertRecord(RESOURCE, OTHER_OWNER, HOLD_SEC);
1410 LockCallback callback2 = mock(LockCallback.class);
1411 lock.extend(HOLD_SEC2, callback2);
1416 assertTrue(lock.isUnavailable());
1418 // no more callbacks should have occurred
1419 verify(callback).lockAvailable(lock);
1420 verify(callback, never()).lockUnavailable(lock);
1422 // extension should have failed
1423 verify(callback2, never()).lockAvailable(lock);
1424 verify(callback2).lockUnavailable(lock);
1428 public void testDistributedLockToString() {
1429 String text = getLock(RESOURCE, OWNER_KEY, HOLD_SEC, callback, false).toString();
1430 assertNotNull(text);
1431 assertThat(text).doesNotContain("ownerInfo").doesNotContain("callback");
1435 public void testMakeThreadPool() {
1436 // use a REAL feature to test this
1437 feature = new DistributedLockManager();
1439 // this should create a thread pool
1440 feature.beforeCreateLockManager(engine, new Properties());
1441 feature.afterStart(engine);
1443 assertThatCode(() -> shutdownFeature()).doesNotThrowAnyException();
1447 * Performs a multi-threaded test of the locking facility.
1449 * @throws InterruptedException if the current thread is interrupted while waiting for
1450 * the background threads to complete
1453 public void testMultiThreaded() throws InterruptedException {
1454 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, realExec);
1456 feature = new DistributedLockManager();
1457 feature.beforeCreateLockManager(PolicyEngineConstants.getManager(), new Properties());
1458 feature.afterStart(PolicyEngineConstants.getManager());
1460 List<MyThread> threads = new ArrayList<>(MAX_THREADS);
1461 for (int x = 0; x < MAX_THREADS; ++x) {
1462 threads.add(new MyThread());
1465 threads.forEach(Thread::start);
1467 for (MyThread thread : threads) {
1469 assertFalse(thread.isAlive());
1472 for (MyThread thread : threads) {
1473 if (thread.err != null) {
1478 assertTrue(nsuccesses.get() > 0);
1481 private DistributedLock getLock(String resource, String ownerKey, int holdSec, LockCallback callback,
1482 boolean waitForLock) {
1483 return (DistributedLock) feature.createLock(resource, ownerKey, holdSec, callback, waitForLock);
1486 private DistributedLock roundTrip(DistributedLock lock) throws Exception {
1487 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1488 try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
1489 oos.writeObject(lock);
1492 ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
1493 try (ObjectInputStream ois = new ObjectInputStream(bais)) {
1494 return (DistributedLock) ois.readObject();
1499 * Runs the checkExpired() action.
1501 * @param nskip number of actions in the work queue to skip
1502 * @param nadditional number of additional actions that appear in the work queue
1503 * <i>after</i> the checkExpired action
1504 * @param schedSec number of seconds for which the checker should have been scheduled
1506 private void runChecker(int nskip, int nadditional, long schedSec) {
1507 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1508 verify(exsvc, times(nskip + nadditional + 1)).schedule(captor.capture(), eq(schedSec), eq(TimeUnit.SECONDS));
1509 Runnable action = captor.getAllValues().get(nskip);
1514 * Runs a lock action (e.g., doLock, doUnlock).
1516 * @param nskip number of actions in the work queue to skip
1517 * @param nadditional number of additional actions that appear in the work queue
1518 * <i>after</i> the desired action
1520 void runLock(int nskip, int nadditional) {
1521 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1522 verify(exsvc, times(PRE_LOCK_EXECS + nskip + nadditional + 1)).execute(captor.capture());
1524 Runnable action = captor.getAllValues().get(PRE_LOCK_EXECS + nskip);
1529 * Runs a scheduled action (e.g., "retry" action).
1531 * @param nskip number of actions in the work queue to skip
1532 * @param nadditional number of additional actions that appear in the work queue
1533 * <i>after</i> the desired action
1535 void runSchedule(int nskip, int nadditional) {
1536 ArgumentCaptor<Runnable> captor = ArgumentCaptor.forClass(Runnable.class);
1537 verify(exsvc, times(PRE_SCHED_EXECS + nskip + nadditional + 1)).schedule(captor.capture(), anyLong(), any());
1539 Runnable action = captor.getAllValues().get(PRE_SCHED_EXECS + nskip);
1544 * Gets a count of the number of lock records in the DB.
1546 * @return the number of lock records in the DB
1547 * @throws SQLException if an error occurs accessing the DB
1549 private int getRecordCount() throws SQLException {
1550 try (PreparedStatement stmt = conn.prepareStatement("SELECT count(*) FROM pooling.locks");
1551 ResultSet result = stmt.executeQuery()) {
1553 if (result.next()) {
1554 return result.getInt(1);
1563 * Determines if there is a record for the given resource whose expiration time is in
1564 * the expected range.
1566 * @param resourceId ID of the resource of interest
1567 * @param uuidString UUID string of the owner
1568 * @param holdSec seconds for which the lock was to be held
1569 * @param tbegin earliest time, in milliseconds, at which the record could have been
1570 * inserted into the DB
1571 * @return {@code true} if a record is found, {@code false} otherwise
1572 * @throws SQLException if an error occurs accessing the DB
1574 private boolean recordInRange(String resourceId, String uuidString, int holdSec, long tbegin) throws SQLException {
1575 try (PreparedStatement stmt =
1576 conn.prepareStatement("SELECT timestampdiff(second, now(), expirationTime) FROM pooling.locks"
1577 + " WHERE resourceId=? AND host=? AND owner=?")) {
1579 stmt.setString(1, resourceId);
1580 stmt.setString(2, feature.getHostName());
1581 stmt.setString(3, uuidString);
1583 try (ResultSet result = stmt.executeQuery()) {
1584 if (result.next()) {
1585 int remaining = result.getInt(1);
1586 long maxDiff = System.currentTimeMillis() - tbegin;
1587 return (remaining >= 0 && holdSec - remaining <= maxDiff);
1597 * Inserts a record into the DB.
1599 * @param resourceId ID of the resource of interest
1600 * @param uuidString UUID string of the owner
1601 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1602 * @throws SQLException if an error occurs accessing the DB
1604 private void insertRecord(String resourceId, String uuidString, int expireOffset) throws SQLException {
1605 this.insertRecord(resourceId, feature.getHostName(), uuidString, expireOffset);
1608 private void insertRecord(String resourceId, String hostName, String uuidString, int expireOffset)
1609 throws SQLException {
1610 try (PreparedStatement stmt =
1611 conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) "
1612 + "values (?, ?, ?, timestampadd(second, ?, now()))")) {
1614 stmt.setString(1, resourceId);
1615 stmt.setString(2, hostName);
1616 stmt.setString(3, uuidString);
1617 stmt.setInt(4, expireOffset);
1619 assertEquals(1, stmt.executeUpdate());
1624 * Updates a record in the DB.
1626 * @param resourceId ID of the resource of interest
1627 * @param newUuid UUID string of the <i>new</i> owner
1628 * @param expireOffset offset, in seconds, from "now", at which the lock should expire
1629 * @throws SQLException if an error occurs accessing the DB
1631 private void updateRecord(String resourceId, String newHost, String newUuid, int expireOffset) throws SQLException {
1632 try (PreparedStatement stmt = conn.prepareStatement("UPDATE pooling.locks SET host=?, owner=?,"
1633 + " expirationTime=timestampadd(second, ?, now()) WHERE resourceId=?")) {
1635 stmt.setString(1, newHost);
1636 stmt.setString(2, newUuid);
1637 stmt.setInt(3, expireOffset);
1638 stmt.setString(4, resourceId);
1640 assertEquals(1, stmt.executeUpdate());
1645 * Feature that uses <i>exsvc</i> to execute requests.
1647 private class MyLockingFeature extends DistributedLockManager {
1649 public MyLockingFeature(boolean init) {
1652 exsvc = mock(ScheduledExecutorService.class);
1653 when(exsvc.schedule(any(Runnable.class), anyLong(), any())).thenAnswer(ans -> checker);
1654 Whitebox.setInternalState(PolicyEngineConstants.getManager(), POLICY_ENGINE_EXECUTOR_FIELD, exsvc);
1657 beforeCreateLockManager(engine, new Properties());
1665 * Feature whose data source all throws exceptions.
1667 private class InvalidDbLockingFeature extends MyLockingFeature {
1668 private boolean isTransient;
1669 private boolean freeLock = false;
1671 public InvalidDbLockingFeature(boolean isTransient) {
1672 // pass "false" because we have to set the error code BEFORE calling
1676 this.isTransient = isTransient;
1678 this.beforeCreateLockManager(engine, new Properties());
1680 this.afterStart(engine);
1684 protected BasicDataSource makeDataSource() throws Exception {
1685 when(datasrc.getConnection()).thenAnswer(answer -> {
1694 doThrow(makeEx()).when(datasrc).close();
1699 protected SQLException makeEx() {
1701 return new SQLException(new SQLTransientException(EXPECTED_EXCEPTION));
1704 return new SQLException(EXPECTED_EXCEPTION);
1710 * Feature whose locks free themselves while free() is already running.
1712 private class FreeWithFreeLockingFeature extends MyLockingFeature {
1713 private boolean relock;
1715 public FreeWithFreeLockingFeature(boolean relock) {
1717 this.relock = relock;
1721 protected DistributedLock makeLock(LockState state, String resourceId, String ownerKey, int holdSec,
1722 LockCallback callback) {
1724 return new DistributedLock(state, resourceId, ownerKey, holdSec, callback, feature) {
1725 private static final long serialVersionUID = 1L;
1726 private boolean checked = false;
1729 public boolean isUnavailable() {
1731 return super.isUnavailable();
1736 // release and relock
1744 createLock(RESOURCE, getOwnerKey(), HOLD_SEC, mock(LockCallback.class), false);
1754 * Thread used with the multi-threaded test. It repeatedly attempts to get a lock,
1755 * extend it, and then unlock it.
1757 private class MyThread extends Thread {
1758 AssertionError err = null;
1767 for (int x = 0; x < MAX_LOOPS; ++x) {
1771 } catch (AssertionError e) {
1776 private void makeAttempt() {
1778 Semaphore sem = new Semaphore(0);
1780 LockCallback cb = new LockCallback() {
1782 public void lockAvailable(Lock lock) {
1787 public void lockUnavailable(Lock lock) {
1792 Lock lock = feature.createLock(RESOURCE, getName(), HOLD_SEC, cb, false);
1794 // wait for callback, whether available or unavailable
1795 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1796 if (!lock.isActive()) {
1800 nsuccesses.incrementAndGet();
1802 assertEquals(1, nactive.incrementAndGet());
1804 lock.extend(HOLD_SEC2, cb);
1805 assertTrue(sem.tryAcquire(5, TimeUnit.SECONDS));
1806 assertTrue(lock.isActive());
1808 // decrement BEFORE free()
1809 nactive.decrementAndGet();
1811 assertTrue(lock.free());
1812 assertTrue(lock.isUnavailable());
1814 } catch (InterruptedException e) {
1815 Thread.currentThread().interrupt();
1816 throw new AssertionError("interrupted", e);