2 * ========================LICENSE_START=================================
4 * ======================================================================
5 * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
6 * ======================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.onap.ccsdk.oran.a1policymanagementservice.repository;
23 import java.util.ArrayList;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Queue;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 import reactor.core.publisher.Mono;
34 import reactor.core.publisher.MonoSink;
37 * A resource lock. Exclusive means that the caller takes exclusive ownership of
38 * the resurce. Non exclusive lock means that several users can lock the
39 * resource (for shared usage).
42 private static final Logger logger = LoggerFactory.getLogger(Lock.class);
44 boolean isExclusive = false;
45 private int lockCounter = 0;
46 final Queue<LockRequest> lockRequestQueue = new LinkedList<>();
47 private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
48 private final String label;
50 public enum LockType {
55 * A grant is achieved when the lock is granted.
56 * It can be used for unlocking.
58 public static class Grant {
59 private final Lock lock;
60 private boolean unlocked = false;
62 private final String label;
64 Grant(Lock lock, String label) {
67 logger.trace("Lock granted {}:{}", lock.label, this.label);
71 * reactive unlocking. Submits the lock.
75 public Mono<Lock> unlock() {
77 logger.trace("Unlocking lock {}:{}", lock.label, this.label);
78 return this.lock.unlock();
80 return Mono.just(this.lock);
84 * Synchronuous unlocking
86 public void unlockBlocking() {
88 logger.trace("Unlocking lock {}:{}", lock.label, this.label);
89 this.lock.unlockBlocking();
93 private boolean isUnlocked() {
95 logger.debug("Lock {}:{} already unlocked", lock.label, this.label);
105 * @param label a label attached to the lock. For troubleshooting.
107 public Lock(String label) {
112 * Reactive lock. The Lock will be emitted when the lock is granted
114 * @param lockType type of lock (exclusive/shared)
115 * @param label a label that will be attached to the request. Will be passed
117 * @return a Grant that cane be used only to unlock.
119 public synchronized Mono<Grant> lock(LockType lockType, String label) {
120 if (tryLock(lockType)) {
121 return Mono.just(new Grant(this, label));
123 return Mono.create(monoSink -> addToQueue(monoSink, lockType, label));
128 * A synchronuous variant of locking. The caller thread will be blocked util the
131 public synchronized Grant lockBlocking(LockType locktype, String label) {
132 while (!tryLock(locktype)) {
133 this.waitForUnlock();
135 return new Grant(this, label);
138 public Mono<Lock> unlock() {
139 return Mono.create(monoSink -> {
141 monoSink.success(this);
145 public synchronized void unlockBlocking() {
146 if (lockCounter <= 0) {
147 lockCounter = -1; // Might as well stop, to make it easier to find the problem
148 logger.error("Number of unlocks must match the number of locks");
151 if (lockCounter == 0) {
155 this.processQueuedEntries();
159 public synchronized String toString() {
160 return "Lock " + this.label + ", cnt: " + this.lockCounter + ", exclusive: " + this.isExclusive + ", queued: "
161 + this.lockRequestQueue.size();
164 /** returns the current number of granted locks */
165 public synchronized int getLockCounter() {
166 return this.lockCounter;
169 private void processQueuedEntries() {
170 List<LockRequest> granted = new ArrayList<>();
171 while (!lockRequestQueue.isEmpty()) {
172 LockRequest request = lockRequestQueue.element();
173 if (tryLock(request.lockType)) {
174 lockRequestQueue.remove();
175 granted.add(request);
177 break; // Avoid starvation
180 callbackProcessor.addAll(granted);
183 private synchronized void addToQueue(MonoSink<Grant> callback, LockType lockType, String label) {
184 logger.trace("Lock request queued {}:{}", label, this.label);
185 lockRequestQueue.add(new LockRequest(callback, lockType, this, label));
188 @SuppressWarnings("java:S2274") // Always invoke wait() and await() methods inside a loop
189 private synchronized void waitForUnlock() {
192 } catch (InterruptedException e) {
193 logger.warn("waitForUnlock interrupted " + this.label, e);
194 Thread.currentThread().interrupt();
198 private boolean tryLock(LockType lockType) {
199 if (this.isExclusive) {
202 if (lockType == LockType.EXCLUSIVE && lockCounter > 0) {
206 this.isExclusive = lockType == LockType.EXCLUSIVE;
211 * Represents a queued lock request
213 private static class LockRequest {
214 final MonoSink<Grant> callback;
215 final LockType lockType;
219 LockRequest(MonoSink<Grant> callback, LockType lockType, Lock lock, String label) {
220 this.callback = callback;
221 this.lockType = lockType;
228 * A separate thread that calls a MonoSink to continue. This is done after a
229 * queued lock is granted.
231 private static class AsynchCallbackExecutor implements Runnable {
232 private List<LockRequest> lockRequestQueue = new LinkedList<>();
234 public AsynchCallbackExecutor() {
235 Thread thread = new Thread(this);
239 public synchronized void addAll(List<LockRequest> requests) {
240 this.lockRequestQueue.addAll(requests);
248 for (LockRequest request : consume()) {
249 Grant g = new Grant(request.lock, request.label);
250 request.callback.success(g);
254 } catch (InterruptedException e) {
255 Thread.currentThread().interrupt();
256 logger.error("Interrupted {}", e.getMessage());
260 private synchronized List<LockRequest> consume() {
261 List<LockRequest> q = this.lockRequestQueue;
262 this.lockRequestQueue = new LinkedList<>();
266 @SuppressWarnings("java:S2274")
267 private synchronized void waitForNewEntries() throws InterruptedException {
268 if (this.lockRequestQueue.isEmpty()) {