private int lockCounter = 0;
final Queue<LockRequest> lockRequestQueue = new LinkedList<>();
private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
+ private final String label;
public enum LockType {
EXCLUSIVE, SHARED
Grant(Lock lock, String label) {
this.lock = lock;
this.label = label;
+ logger.trace("Lock granted {}:{}", lock.label, this.label);
}
/**
* @return the lock
*/
public Mono<Lock> unlock() {
- check();
- return this.lock.unlock();
+ if (!isUnlocked()) {
+ logger.trace("Unlocking lock {}:{}", lock.label, this.label);
+ return this.lock.unlock();
+ }
+ return Mono.just(this.lock);
}
/**
* Synchronuous unlocking
*/
public void unlockBlocking() {
- check();
- this.lock.unlockBlocking();
+ if (!isUnlocked()) {
+ logger.trace("Unlocking lock {}:{}", lock.label, this.label);
+ this.lock.unlockBlocking();
+ }
}
- private void check() {
+ private boolean isUnlocked() {
if (unlocked) {
- logger.error("Lock already unlocked");
+ logger.debug("Lock {}:{} already unlocked", lock.label, this.label);
+ return true;
}
unlocked = true;
+ return false;
}
}
+ /**
+ *
+ * @param label a label attached to the lock. For troubleshooting.
+ */
+ public Lock(String label) {
+ this.label = label;
+ }
+
/**
* Reactive lock. The Lock will be emitted when the lock is granted
*
@Override
public synchronized String toString() {
- return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: "
+ return "Lock " + this.label + ", cnt: " + this.lockCounter + ", exclusive: " + this.isExclusive + ", queued: "
+ this.lockRequestQueue.size();
}
}
private synchronized void addToQueue(MonoSink<Grant> callback, LockType lockType, String label) {
+ logger.trace("Lock request queued {}:{}", label, this.label);
lockRequestQueue.add(new LockRequest(callback, lockType, this, label));
}
try {
this.wait();
} catch (InterruptedException e) {
- logger.warn("waitForUnlock interrupted", e);
+ logger.warn("waitForUnlock interrupted " + this.label, e);
Thread.currentThread().interrupt();
}
}
@Test
void testLock() throws IOException, ServiceException {
- Lock lock = new Lock();
+ Lock lock = new Lock("l1");
Lock.Grant grant = lock.lockBlocking(LockType.SHARED, "test");
grant.unlockBlocking();
assertThat(grant.getLabel()).isEqualTo("test");
@Test
void testReactiveLock() {
- Lock lock = new Lock();
+ Lock lock = new Lock("l1");
Mono<?> l0 = lock.lock(LockType.EXCLUSIVE, "1").doOnNext(grant -> asynchUnlock(grant, lock));
Mono<?> l1 = lock.lock(LockType.SHARED, "2").doOnNext(grant -> asynchUnlock(grant, lock));