18507ac292f527957a2647e32ead93b7e3bbfebe
[ccsdk/oran.git] /
1 /*-
2  * ========================LICENSE_START=================================
3  * ONAP : ccsdk oran
4  * ======================================================================
5  * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
6  * ======================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.onap.ccsdk.oran.a1policymanagementservice.repository;
22
23 import java.util.ArrayList;
24 import java.util.LinkedList;
25 import java.util.List;
26 import java.util.Queue;
27
28 import lombok.Getter;
29
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import reactor.core.publisher.Mono;
34 import reactor.core.publisher.MonoSink;
35
36 /**
37  * A resource lock. Exclusive means that the caller takes exclusive ownership of
38  * the resurce. Non exclusive lock means that several users can lock the
39  * resource (for shared usage).
40  */
41 public class Lock {
42     private static final Logger logger = LoggerFactory.getLogger(Lock.class);
43
44     boolean isExclusive = false;
45     private int lockCounter = 0;
46     final Queue<LockRequest> lockRequestQueue = new LinkedList<>();
47     private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
48
49     public enum LockType {
50         EXCLUSIVE, SHARED
51     }
52
53     /**
54      * A grant is achieved when the lock is granted.
55      * It can be used for unlocking.
56      */
57     public static class Grant {
58         private final Lock lock;
59         private boolean unlocked = false;
60         @Getter
61         private final String label;
62
63         Grant(Lock lock, String label) {
64             this.lock = lock;
65             this.label = label;
66         }
67
68         /**
69          * reactive unlocking. Submits the lock.
70          *
71          * @return the lock
72          */
73         public Mono<Lock> unlock() {
74             check();
75             return this.lock.unlock();
76         }
77
78         /**
79          * Synchronuous unlocking
80          */
81         public void unlockBlocking() {
82             check();
83             this.lock.unlockBlocking();
84         }
85
86         private void check() {
87             if (unlocked) {
88                 logger.error("Lock already unlocked");
89             }
90             unlocked = true;
91         }
92     }
93
94     /**
95      * Reactive lock. The Lock will be emitted when the lock is granted
96      *
97      * @param lockType type of lock (exclusive/shared)
98      * @param label a label that will be attached to the request. Will be passed
99      *        back in the Grant
100      * @return a Grant that cane be used only to unlock.
101      */
102     public synchronized Mono<Grant> lock(LockType lockType, String label) {
103         if (tryLock(lockType)) {
104             return Mono.just(new Grant(this, label));
105         } else {
106             return Mono.create(monoSink -> addToQueue(monoSink, lockType, label));
107         }
108     }
109
110     /**
111      * A synchronuous variant of locking. The caller thread will be blocked util the
112      * lock is granted.
113      */
114     public synchronized Grant lockBlocking(LockType locktype, String label) {
115         while (!tryLock(locktype)) {
116             this.waitForUnlock();
117         }
118         return new Grant(this, label);
119     }
120
121     public Mono<Lock> unlock() {
122         return Mono.create(monoSink -> {
123             unlockBlocking();
124             monoSink.success(this);
125         });
126     }
127
128     public synchronized void unlockBlocking() {
129         if (lockCounter <= 0) {
130             lockCounter = -1; // Might as well stop, to make it easier to find the problem
131             logger.error("Number of unlocks must match the number of locks");
132         }
133         this.lockCounter--;
134         if (lockCounter == 0) {
135             isExclusive = false;
136         }
137         this.notifyAll();
138         this.processQueuedEntries();
139     }
140
141     @Override
142     public synchronized String toString() {
143         return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: "
144                 + this.lockRequestQueue.size();
145     }
146
147     /** returns the current number of granted locks */
148     public synchronized int getLockCounter() {
149         return this.lockCounter;
150     }
151
152     private void processQueuedEntries() {
153         List<LockRequest> granted = new ArrayList<>();
154         while (!lockRequestQueue.isEmpty()) {
155             LockRequest request = lockRequestQueue.element();
156             if (tryLock(request.lockType)) {
157                 lockRequestQueue.remove();
158                 granted.add(request);
159             } else {
160                 break; // Avoid starvation
161             }
162         }
163         callbackProcessor.addAll(granted);
164     }
165
166     private synchronized void addToQueue(MonoSink<Grant> callback, LockType lockType, String label) {
167         lockRequestQueue.add(new LockRequest(callback, lockType, this, label));
168     }
169
170     @SuppressWarnings("java:S2274") // Always invoke wait() and await() methods inside a loop
171     private synchronized void waitForUnlock() {
172         try {
173             this.wait();
174         } catch (InterruptedException e) {
175             logger.warn("waitForUnlock interrupted", e);
176             Thread.currentThread().interrupt();
177         }
178     }
179
180     private boolean tryLock(LockType lockType) {
181         if (this.isExclusive) {
182             return false;
183         }
184         if (lockType == LockType.EXCLUSIVE && lockCounter > 0) {
185             return false;
186         }
187         lockCounter++;
188         this.isExclusive = lockType == LockType.EXCLUSIVE;
189         return true;
190     }
191
192     /**
193      * Represents a queued lock request
194      */
195     private static class LockRequest {
196         final MonoSink<Grant> callback;
197         final LockType lockType;
198         final Lock lock;
199         final String label;
200
201         LockRequest(MonoSink<Grant> callback, LockType lockType, Lock lock, String label) {
202             this.callback = callback;
203             this.lockType = lockType;
204             this.lock = lock;
205             this.label = label;
206         }
207     }
208
209     /**
210      * A separate thread that calls a MonoSink to continue. This is done after a
211      * queued lock is granted.
212      */
213     private static class AsynchCallbackExecutor implements Runnable {
214         private List<LockRequest> lockRequestQueue = new LinkedList<>();
215
216         public AsynchCallbackExecutor() {
217             Thread thread = new Thread(this);
218             thread.start();
219         }
220
221         public synchronized void addAll(List<LockRequest> requests) {
222             this.lockRequestQueue.addAll(requests);
223             this.notifyAll();
224         }
225
226         @Override
227         public void run() {
228             try {
229                 while (true) {
230                     for (LockRequest request : consume()) {
231                         Grant g = new Grant(request.lock, request.label);
232                         request.callback.success(g);
233                     }
234                     waitForNewEntries();
235                 }
236             } catch (InterruptedException e) {
237                 Thread.currentThread().interrupt();
238                 logger.error("Interrupted {}", e.getMessage());
239             }
240         }
241
242         private synchronized List<LockRequest> consume() {
243             List<LockRequest> q = this.lockRequestQueue;
244             this.lockRequestQueue = new LinkedList<>();
245             return q;
246         }
247
248         @SuppressWarnings("java:S2274")
249         private synchronized void waitForNewEntries() throws InterruptedException {
250             if (this.lockRequestQueue.isEmpty()) {
251                 this.wait();
252             }
253         }
254     }
255 }