2 * ========================LICENSE_START=================================
4 * ======================================================================
5 * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
6 * ======================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.onap.ccsdk.oran.a1policymanagementservice.repository;
23 import java.util.ArrayList;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Queue;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 import reactor.core.publisher.Mono;
34 import reactor.core.publisher.MonoSink;
37 * A resource lock. Exclusive means that the caller takes exclusive ownership of
38 * the resurce. Non exclusive lock means that several users can lock the
39 * resource (for shared usage).
42 private static final Logger logger = LoggerFactory.getLogger(Lock.class);
44 boolean isExclusive = false;
45 private int lockCounter = 0;
46 final Queue<LockRequest> lockRequestQueue = new LinkedList<>();
47 private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
49 public enum LockType {
54 * A grant is achieved when the lock is granted.
55 * It can be used for unlocking.
57 public static class Grant {
58 private final Lock lock;
59 private boolean unlocked = false;
61 private final String label;
63 Grant(Lock lock, String label) {
69 * reactive unlocking. Submits the lock.
73 public Mono<Lock> unlock() {
75 return this.lock.unlock();
79 * Synchronuous unlocking
81 public void unlockBlocking() {
83 this.lock.unlockBlocking();
86 private void check() {
88 logger.error("Lock already unlocked");
95 * Reactive lock. The Lock will be emitted when the lock is granted
97 * @param lockType type of lock (exclusive/shared)
98 * @param label a label that will be attached to the request. Will be passed
100 * @return a Grant that cane be used only to unlock.
102 public synchronized Mono<Grant> lock(LockType lockType, String label) {
103 if (tryLock(lockType)) {
104 return Mono.just(new Grant(this, label));
106 return Mono.create(monoSink -> addToQueue(monoSink, lockType, label));
111 * A synchronuous variant of locking. The caller thread will be blocked util the
114 public synchronized Grant lockBlocking(LockType locktype, String label) {
115 while (!tryLock(locktype)) {
116 this.waitForUnlock();
118 return new Grant(this, label);
121 public Mono<Lock> unlock() {
122 return Mono.create(monoSink -> {
124 monoSink.success(this);
128 public synchronized void unlockBlocking() {
129 if (lockCounter <= 0) {
130 lockCounter = -1; // Might as well stop, to make it easier to find the problem
131 logger.error("Number of unlocks must match the number of locks");
134 if (lockCounter == 0) {
138 this.processQueuedEntries();
142 public synchronized String toString() {
143 return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: "
144 + this.lockRequestQueue.size();
147 /** returns the current number of granted locks */
148 public synchronized int getLockCounter() {
149 return this.lockCounter;
152 private void processQueuedEntries() {
153 List<LockRequest> granted = new ArrayList<>();
154 while (!lockRequestQueue.isEmpty()) {
155 LockRequest request = lockRequestQueue.element();
156 if (tryLock(request.lockType)) {
157 lockRequestQueue.remove();
158 granted.add(request);
160 break; // Avoid starvation
163 callbackProcessor.addAll(granted);
166 private synchronized void addToQueue(MonoSink<Grant> callback, LockType lockType, String label) {
167 lockRequestQueue.add(new LockRequest(callback, lockType, this, label));
170 @SuppressWarnings("java:S2274") // Always invoke wait() and await() methods inside a loop
171 private synchronized void waitForUnlock() {
174 } catch (InterruptedException e) {
175 logger.warn("waitForUnlock interrupted", e);
176 Thread.currentThread().interrupt();
180 private boolean tryLock(LockType lockType) {
181 if (this.isExclusive) {
184 if (lockType == LockType.EXCLUSIVE && lockCounter > 0) {
188 this.isExclusive = lockType == LockType.EXCLUSIVE;
193 * Represents a queued lock request
195 private static class LockRequest {
196 final MonoSink<Grant> callback;
197 final LockType lockType;
201 LockRequest(MonoSink<Grant> callback, LockType lockType, Lock lock, String label) {
202 this.callback = callback;
203 this.lockType = lockType;
210 * A separate thread that calls a MonoSink to continue. This is done after a
211 * queued lock is granted.
213 private static class AsynchCallbackExecutor implements Runnable {
214 private List<LockRequest> lockRequestQueue = new LinkedList<>();
216 public AsynchCallbackExecutor() {
217 Thread thread = new Thread(this);
221 public synchronized void addAll(List<LockRequest> requests) {
222 this.lockRequestQueue.addAll(requests);
230 for (LockRequest request : consume()) {
231 Grant g = new Grant(request.lock, request.label);
232 request.callback.success(g);
236 } catch (InterruptedException e) {
237 Thread.currentThread().interrupt();
238 logger.error("Interrupted {}", e.getMessage());
242 private synchronized List<LockRequest> consume() {
243 List<LockRequest> q = this.lockRequestQueue;
244 this.lockRequestQueue = new LinkedList<>();
248 @SuppressWarnings("java:S2274")
249 private synchronized void waitForNewEntries() throws InterruptedException {
250 if (this.lockRequestQueue.isEmpty()) {