46033a4a360011a89673e5f75d33deaec581e8f3
[ccsdk/oran.git] /
1 /*-
2  * ========================LICENSE_START=================================
3  * ONAP : ccsdk oran
4  * ======================================================================
5  * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
6  * ======================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.onap.ccsdk.oran.a1policymanagementservice.repository;
22
23 import java.util.ArrayList;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Queue;
27
28 import lombok.Getter;
29
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import reactor.core.publisher.Mono;
34 import reactor.core.publisher.MonoSink;
35
36 /**
37  * A resource lock. Exclusive means that the caller takes exclusive ownership of
38  * the resurce. Non exclusive lock means that several users can lock the
39  * resource (for shared usage).
40  */
41 public class Lock {
42     private static final Logger logger = LoggerFactory.getLogger(Lock.class);
43
44     boolean isExclusive = false;
45     private int lockCounter = 0;
46     final Queue<LockRequest> lockRequestQueue = new LinkedList<>();
47     private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
48     private final String label;
49
50     public enum LockType {
51         EXCLUSIVE, SHARED
52     }
53
54     /**
55      * A grant is achieved when the lock is granted.
56      * It can be used for unlocking.
57      */
58     public static class Grant {
59         private final Lock lock;
60         private boolean unlocked = false;
61         @Getter
62         private final String label;
63
64         Grant(Lock lock, String label) {
65             this.lock = lock;
66             this.label = label;
67             logger.trace("Lock granted {}:{}", lock.label, this.label);
68         }
69
70         /**
71          * reactive unlocking. Submits the lock.
72          *
73          * @return the lock
74          */
75         public Mono<Lock> unlock() {
76             if (!isUnlocked()) {
77                 logger.trace("Unlocking lock {}:{}", lock.label, this.label);
78                 return this.lock.unlock();
79             }
80             return Mono.just(this.lock);
81         }
82
83         /**
84          * Synchronuous unlocking
85          */
86         public void unlockBlocking() {
87             if (!isUnlocked()) {
88                 logger.trace("Unlocking lock {}:{}", lock.label, this.label);
89                 this.lock.unlockBlocking();
90             }
91         }
92
93         private boolean isUnlocked() {
94             if (unlocked) {
95                 logger.debug("Lock {}:{} already unlocked", lock.label, this.label);
96                 return true;
97             }
98             unlocked = true;
99             return false;
100         }
101     }
102
103     /**
104      *
105      * @param label a label attached to the lock. For troubleshooting.
106      */
107     public Lock(String label) {
108         this.label = label;
109     }
110
111     /**
112      * Reactive lock. The Lock will be emitted when the lock is granted
113      *
114      * @param lockType type of lock (exclusive/shared)
115      * @param label a label that will be attached to the request. Will be passed
116      *        back in the Grant
117      * @return a Grant that cane be used only to unlock.
118      */
119     public synchronized Mono<Grant> lock(LockType lockType, String label) {
120         if (tryLock(lockType)) {
121             return Mono.just(new Grant(this, label));
122         } else {
123             return Mono.create(monoSink -> addToQueue(monoSink, lockType, label));
124         }
125     }
126
127     /**
128      * A synchronuous variant of locking. The caller thread will be blocked util the
129      * lock is granted.
130      */
131     public synchronized Grant lockBlocking(LockType locktype, String label) {
132         while (!tryLock(locktype)) {
133             this.waitForUnlock();
134         }
135         return new Grant(this, label);
136     }
137
138     public Mono<Lock> unlock() {
139         return Mono.create(monoSink -> {
140             unlockBlocking();
141             monoSink.success(this);
142         });
143     }
144
145     public synchronized void unlockBlocking() {
146         if (lockCounter <= 0) {
147             lockCounter = -1; // Might as well stop, to make it easier to find the problem
148             logger.error("Number of unlocks must match the number of locks");
149         }
150         this.lockCounter--;
151         if (lockCounter == 0) {
152             isExclusive = false;
153         }
154         this.notifyAll();
155         this.processQueuedEntries();
156     }
157
158     @Override
159     public synchronized String toString() {
160         return "Lock " + this.label + ", cnt: " + this.lockCounter + ", exclusive: " + this.isExclusive + ", queued: "
161                 + this.lockRequestQueue.size();
162     }
163
164     /** returns the current number of granted locks */
165     public synchronized int getLockCounter() {
166         return this.lockCounter;
167     }
168
169     private void processQueuedEntries() {
170         List<LockRequest> granted = new ArrayList<>();
171         while (!lockRequestQueue.isEmpty()) {
172             LockRequest request = lockRequestQueue.element();
173             if (tryLock(request.lockType)) {
174                 lockRequestQueue.remove();
175                 granted.add(request);
176             } else {
177                 break; // Avoid starvation
178             }
179         }
180         callbackProcessor.addAll(granted);
181     }
182
183     private synchronized void addToQueue(MonoSink<Grant> callback, LockType lockType, String label) {
184         logger.trace("Lock request queued {}:{}", label, this.label);
185         lockRequestQueue.add(new LockRequest(callback, lockType, this, label));
186     }
187
188     @SuppressWarnings("java:S2274") // Always invoke wait() and await() methods inside a loop
189     private synchronized void waitForUnlock() {
190         try {
191             this.wait();
192         } catch (InterruptedException e) {
193             logger.warn("waitForUnlock interrupted " + this.label, e);
194             Thread.currentThread().interrupt();
195         }
196     }
197
198     private boolean tryLock(LockType lockType) {
199         if (this.isExclusive) {
200             return false;
201         }
202         if (lockType == LockType.EXCLUSIVE && lockCounter > 0) {
203             return false;
204         }
205         lockCounter++;
206         this.isExclusive = lockType == LockType.EXCLUSIVE;
207         return true;
208     }
209
210     /**
211      * Represents a queued lock request
212      */
213     private static class LockRequest {
214         final MonoSink<Grant> callback;
215         final LockType lockType;
216         final Lock lock;
217         final String label;
218
219         LockRequest(MonoSink<Grant> callback, LockType lockType, Lock lock, String label) {
220             this.callback = callback;
221             this.lockType = lockType;
222             this.lock = lock;
223             this.label = label;
224         }
225     }
226
227     /**
228      * A separate thread that calls a MonoSink to continue. This is done after a
229      * queued lock is granted.
230      */
231     private static class AsynchCallbackExecutor implements Runnable {
232         private List<LockRequest> lockRequestQueue = new LinkedList<>();
233
234         public AsynchCallbackExecutor() {
235             Thread thread = new Thread(this);
236             thread.start();
237         }
238
239         public synchronized void addAll(List<LockRequest> requests) {
240             this.lockRequestQueue.addAll(requests);
241             this.notifyAll();
242         }
243
244         @Override
245         public void run() {
246             try {
247                 while (true) {
248                     for (LockRequest request : consume()) {
249                         Grant g = new Grant(request.lock, request.label);
250                         request.callback.success(g);
251                     }
252                     waitForNewEntries();
253                 }
254             } catch (InterruptedException e) {
255                 Thread.currentThread().interrupt();
256                 logger.error("Interrupted {}", e.getMessage());
257             }
258         }
259
260         private synchronized List<LockRequest> consume() {
261             List<LockRequest> q = this.lockRequestQueue;
262             this.lockRequestQueue = new LinkedList<>();
263             return q;
264         }
265
266         @SuppressWarnings("java:S2274")
267         private synchronized void waitForNewEntries() throws InterruptedException {
268             if (this.lockRequestQueue.isEmpty()) {
269                 this.wait();
270             }
271         }
272     }
273 }