[SDNC-5] Rebase sdnc-core
[sdnc/core.git] / dblib / common / src / main / java / org / apache / tomcat / jdbc / pool / MultiLockFairBlockingQueue.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * openecomp
4  * ================================================================================
5  * Copyright (C) 2016 - 2017 AT&T
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 /*
22  * Licensed to the Apache Software Foundation (ASF) under one or more
23  * contributor license agreements.  See the NOTICE file distributed with
24  * this work for additional information regarding copyright ownership.
25  * The ASF licenses this file to You under the Apache License, Version 2.0
26  * (the "License"); you may not use this file except in compliance with
27  * the License.  You may obtain a copy of the License at
28  *
29  *      http://www.apache.org/licenses/LICENSE-2.0
30  *
31  * Unless required by applicable law or agreed to in writing, software
32  * distributed under the License is distributed on an "AS IS" BASIS,
33  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
34  * See the License for the specific language governing permissions and
35  * limitations under the License.
36  */
37 package org.apache.tomcat.jdbc.pool;
38
39 import java.util.ArrayList;
40 import java.util.Collection;
41 import java.util.Iterator;
42 import java.util.LinkedList;
43 import java.util.NoSuchElementException;
44 import java.util.concurrent.BlockingQueue;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.ExecutionException;
47 import java.util.concurrent.Future;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.TimeoutException;
50 import java.util.concurrent.atomic.AtomicInteger;
51 import java.util.concurrent.locks.ReentrantLock;
52
53 /**
54  * <b>EXPERIMENTAL AND NOT YET COMPLETE!</b>
55  *
56  *
57  * An implementation of a blocking queue with fairness waiting and lock dispersal to avoid contention.
58  * invocations to method poll(...) will get handed out in the order they were received.
59  * Locking is fine grained, a shared lock is only used during the first level of contention, waiting is done in a
60  * lock per thread basis so that order is guaranteed once the thread goes into a suspended monitor state.
61  * <br>
62  * Not all of the methods of the {@link java.util.concurrent.BlockingQueue} are implemented.
63  *
64  * @param <E> Type of element in the queue
65  */
66
67 public class MultiLockFairBlockingQueue<E> implements BlockingQueue<E> {
68
69     final int LOCK_COUNT = Runtime.getRuntime().availableProcessors();
70
71     final AtomicInteger putQueue = new AtomicInteger(0);
72     final AtomicInteger pollQueue = new AtomicInteger(0);
73
74     public int getNextPut() {
75         int idx = Math.abs(putQueue.incrementAndGet()) % LOCK_COUNT;
76         return idx;
77     }
78
79     public int getNextPoll() {
80         int idx = Math.abs(pollQueue.incrementAndGet()) % LOCK_COUNT;
81         return idx;
82     }
83     /**
84      * Phase one entry lock in order to give out
85      * per-thread-locks for the waiting phase we have
86      * a phase one lock during the contention period.
87      */
88     private final ReentrantLock[] locks = new ReentrantLock[LOCK_COUNT];
89
90     /**
91      * All the objects in the pool are stored in a simple linked list
92      */
93     final LinkedList<E>[] items;
94
95     /**
96      * All threads waiting for an object are stored in a linked list
97      */
98     final LinkedList<ExchangeCountDownLatch<E>>[] waiters;
99
100     /**
101      * Creates a new fair blocking queue.
102      */
103     @SuppressWarnings("unchecked") // Can create arrays of generic types
104     public MultiLockFairBlockingQueue() {
105         items = new LinkedList[LOCK_COUNT];
106         waiters = new LinkedList[LOCK_COUNT];
107         for (int i=0; i<LOCK_COUNT; i++) {
108             items[i] = new LinkedList<>();
109             waiters[i] = new LinkedList<>();
110             locks[i] = new ReentrantLock(false);
111         }
112     }
113
114     //------------------------------------------------------------------
115     // USED BY CONPOOL IMPLEMENTATION
116     //------------------------------------------------------------------
117     /**
118      * Will always return true, queue is unbounded.
119      * {@inheritDoc}
120      */
121     @Override
122     public boolean offer(E e) {
123         int idx = getNextPut();
124         //during the offer, we will grab the main lock
125         final ReentrantLock lock = this.locks[idx];
126         lock.lock();
127         ExchangeCountDownLatch<E> c = null;
128         try {
129             //check to see if threads are waiting for an object
130             if (waiters[idx].size() > 0) {
131                 //if threads are waiting grab the latch for that thread
132                 c = waiters[idx].poll();
133                 //give the object to the thread instead of adding it to the pool
134                 c.setItem(e);
135             } else {
136                 //we always add first, so that the most recently used object will be given out
137                 items[idx].addFirst(e);
138             }
139         } finally {
140             lock.unlock();
141         }
142         //if we exchanged an object with another thread, wake it up.
143         if (c!=null) c.countDown();
144         //we have an unbounded queue, so always return true
145         return true;
146     }
147
148     /**
149      * Will never timeout, as it invokes the {@link #offer(Object)} method.
150      * Once a lock has been acquired, the
151      * {@inheritDoc}
152      */
153     @Override
154     public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
155         return offer(e);
156     }
157
158     /**
159      * Fair retrieval of an object in the queue.
160      * Objects are returned in the order the threads requested them.
161      * {@inheritDoc}
162      */
163     @Override
164     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
165         int idx = getNextPoll();
166         E result = null;
167         final ReentrantLock lock = this.locks[idx];
168         try {
169             //acquire the global lock until we know what to do
170             lock.lock();
171             //check to see if we have objects
172             result = items[idx].poll();
173             if (result==null && timeout>0) {
174                 //the queue is empty we will wait for an object
175                 ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1);
176                 //add to the bottom of the wait list
177                 waiters[idx].addLast(c);
178                 //unlock the global lock
179                 lock.unlock();
180                 //wait for the specified timeout
181                 if (!c.await(timeout, unit)) {
182                     //if we timed out, remove ourselves from the waitlist
183                     lock.lock();
184                     waiters[idx].remove(c);
185                     lock.unlock();
186                 }
187                 //return the item we received, can be null if we timed out
188                 result = c.getItem();
189             } else {
190                 //we have an object, release
191                 lock.unlock();
192             }
193         } finally {
194             if (lock.isHeldByCurrentThread()) {
195                 lock.unlock();
196             }
197         }
198         return result;
199     }
200
201     /**
202      * Request an item from the queue asynchronously
203      * @return - a future pending the result from the queue poll request
204      */
205     public Future<E> pollAsync() {
206         int idx = getNextPoll();
207         Future<E> result = null;
208         final ReentrantLock lock = this.locks[idx];
209         try {
210             //grab the global lock
211             lock.lock();
212             //check to see if we have objects in the queue
213             E item = items[idx].poll();
214             if (item==null) {
215                 //queue is empty, add ourselves as waiters
216                 ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1);
217                 waiters[idx].addLast(c);
218                 //return a future that will wait for the object
219                 result = new ItemFuture<>(c);
220             } else {
221                 //return a future with the item
222                 result = new ItemFuture<>(item);
223             }
224         } finally {
225             lock.unlock();
226         }
227         return result;
228     }
229
230     /**
231      * {@inheritDoc}
232      */
233     @Override
234     public boolean remove(Object e) {
235         for (int idx=0; idx<LOCK_COUNT; idx++) {
236             final ReentrantLock lock = this.locks[idx];
237             lock.lock();
238             try {
239                 boolean result = items[idx].remove(e);
240                 if (result) return result;
241             } finally {
242                 lock.unlock();
243             }
244         }
245         return false;
246     }
247
248     /**
249      * {@inheritDoc}
250      */
251     @Override
252     public int size() {
253         int size = 0;
254         for (int idx=0; idx<LOCK_COUNT; idx++) {
255             size += items[idx].size();
256         }
257         return size;
258     }
259
260     /**
261      * {@inheritDoc}
262      */
263     @Override
264     public Iterator<E> iterator() {
265         return new FairIterator();
266     }
267
268     /**
269      * {@inheritDoc}
270      */
271     @Override
272     public E poll() {
273         int idx = getNextPoll();
274         final ReentrantLock lock = this.locks[idx];
275         lock.lock();
276         try {
277             return items[idx].poll();
278         } finally {
279             lock.unlock();
280         }
281     }
282
283     /**
284      * {@inheritDoc}
285      */
286     @Override
287     public boolean contains(Object e) {
288         for (int idx=0; idx<LOCK_COUNT; idx++) {
289             boolean result = items[idx].contains(e);
290             if (result) return result;
291         }
292         return false;
293     }
294
295
296     //------------------------------------------------------------------
297     // NOT USED BY CONPOOL IMPLEMENTATION
298     //------------------------------------------------------------------
299     /**
300      * {@inheritDoc}
301      */
302     @Override
303     public boolean add(E e) {
304         return offer(e);
305     }
306
307     /**
308      * {@inheritDoc}
309      * @throws UnsupportedOperationException - this operation is not supported
310      */
311     @Override
312     public int drainTo(Collection<? super E> c, int maxElements) {
313         throw new UnsupportedOperationException("int drainTo(Collection<? super E> c, int maxElements)");
314     }
315
316     /**
317      * {@inheritDoc}
318      * @throws UnsupportedOperationException - this operation is not supported
319      */
320     @Override
321     public int drainTo(Collection<? super E> c) {
322         return drainTo(c,Integer.MAX_VALUE);
323     }
324
325     /**
326      * {@inheritDoc}
327      */
328     @Override
329     public void put(E e) throws InterruptedException {
330         offer(e);
331     }
332
333     /**
334      * {@inheritDoc}
335      */
336     @Override
337     public int remainingCapacity() {
338         return Integer.MAX_VALUE - size();
339     }
340
341     /**
342      * {@inheritDoc}
343      */
344     @Override
345     public E take() throws InterruptedException {
346         return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
347     }
348
349     /**
350      * {@inheritDoc}
351      */
352     @Override
353     public boolean addAll(Collection<? extends E> c) {
354         Iterator<? extends E> i = c.iterator();
355         while (i.hasNext()) {
356             E e = i.next();
357             offer(e);
358         }
359         return true;
360     }
361
362     /**
363      * {@inheritDoc}
364      * @throws UnsupportedOperationException - this operation is not supported
365      */
366     @Override
367     public void clear() {
368         throw new UnsupportedOperationException("void clear()");
369
370     }
371
372     /**
373      * {@inheritDoc}
374      * @throws UnsupportedOperationException - this operation is not supported
375      */
376     @Override
377     public boolean containsAll(Collection<?> c) {
378         throw new UnsupportedOperationException("boolean containsAll(Collection<?> c)");
379     }
380
381     /**
382      * {@inheritDoc}
383      */
384     @Override
385     public boolean isEmpty() {
386         return size() == 0;
387     }
388
389     /**
390      * {@inheritDoc}
391      * @throws UnsupportedOperationException - this operation is not supported
392      */
393     @Override
394     public boolean removeAll(Collection<?> c) {
395         throw new UnsupportedOperationException("boolean removeAll(Collection<?> c)");
396     }
397
398     /**
399      * {@inheritDoc}
400      * @throws UnsupportedOperationException - this operation is not supported
401      */
402     @Override
403     public boolean retainAll(Collection<?> c) {
404         throw new UnsupportedOperationException("boolean retainAll(Collection<?> c)");
405     }
406
407     /**
408      * {@inheritDoc}
409      * @throws UnsupportedOperationException - this operation is not supported
410      */
411     @Override
412     public Object[] toArray() {
413         throw new UnsupportedOperationException("Object[] toArray()");
414     }
415
416     /**
417      * {@inheritDoc}
418      * @throws UnsupportedOperationException - this operation is not supported
419      */
420     @Override
421     public <T> T[] toArray(T[] a) {
422         throw new UnsupportedOperationException("<T> T[] toArray(T[] a)");
423     }
424
425     /**
426      * {@inheritDoc}
427      * @throws UnsupportedOperationException - this operation is not supported
428      */
429     @Override
430     public E element() {
431         throw new UnsupportedOperationException("E element()");
432     }
433
434     /**
435      * {@inheritDoc}
436      * @throws UnsupportedOperationException - this operation is not supported
437      */
438     @Override
439     public E peek() {
440         throw new UnsupportedOperationException("E peek()");
441     }
442
443     /**
444      * {@inheritDoc}
445      * @throws UnsupportedOperationException - this operation is not supported
446      */
447     @Override
448     public E remove() {
449         throw new UnsupportedOperationException("E remove()");
450     }
451
452
453
454     //------------------------------------------------------------------
455     // Non cancellable Future used to check and see if a connection has been made available
456     //------------------------------------------------------------------
457     protected class ItemFuture<T> implements Future<T> {
458         protected volatile T item = null;
459         protected volatile ExchangeCountDownLatch<T> latch = null;
460         protected volatile boolean canceled = false;
461
462         public ItemFuture(T item) {
463             this.item = item;
464         }
465
466         public ItemFuture(ExchangeCountDownLatch<T> latch) {
467             this.latch = latch;
468         }
469
470         @Override
471         public boolean cancel(boolean mayInterruptIfRunning) {
472             return false; //don't allow cancel for now
473         }
474
475         @Override
476         public T get() throws InterruptedException, ExecutionException {
477             if (item!=null) {
478                 return item;
479             } else if (latch!=null) {
480                 latch.await();
481                 return latch.getItem();
482             } else {
483                 throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception());
484             }
485         }
486
487         @Override
488         public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
489             if (item!=null) {
490                 return item;
491             } else if (latch!=null) {
492                 boolean timedout = !latch.await(timeout, unit);
493                 if (timedout) throw new TimeoutException();
494                 else return latch.getItem();
495             } else {
496                 throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception());
497             }
498         }
499
500         @Override
501         public boolean isCancelled() {
502             return false;
503         }
504
505         @Override
506         public boolean isDone() {
507             return (item!=null || latch.getItem()!=null);
508         }
509
510     }
511
512     //------------------------------------------------------------------
513     // Count down latch that can be used to exchange information
514     //------------------------------------------------------------------
515     protected class ExchangeCountDownLatch<T> extends CountDownLatch {
516         protected volatile T item;
517         public ExchangeCountDownLatch(int i) {
518             super(i);
519         }
520         public T getItem() {
521             return item;
522         }
523         public void setItem(T item) {
524             this.item = item;
525         }
526     }
527
528     //------------------------------------------------------------------
529     // Iterator safe from concurrent modification exceptions
530     //------------------------------------------------------------------
531     protected class FairIterator implements Iterator<E> {
532         E[] elements = null;
533         int index;
534         E element = null;
535
536         @SuppressWarnings("unchecked") // Can't create arrays of generic types
537         public FairIterator() {
538             ArrayList<E> list = new ArrayList<>(MultiLockFairBlockingQueue.this.size());
539             for (int idx=0; idx<LOCK_COUNT; idx++) {
540                 final ReentrantLock lock = MultiLockFairBlockingQueue.this.locks[idx];
541                 lock.lock();
542                 try {
543                     elements = (E[]) new Object[MultiLockFairBlockingQueue.this.items[idx].size()];
544                     MultiLockFairBlockingQueue.this.items[idx].toArray(elements);
545
546                 } finally {
547                     lock.unlock();
548                 }
549             }
550             index = 0;
551             elements = (E[]) new Object[list.size()];
552             list.toArray(elements);
553         }
554         @Override
555         public boolean hasNext() {
556             return index<elements.length;
557         }
558
559         @Override
560         public E next() {
561             if (!hasNext()) {
562                 throw new NoSuchElementException();
563             }
564             element = elements[index++];
565             return element;
566         }
567
568         @Override
569         public void remove() {
570             for (int idx=0; idx<LOCK_COUNT; idx++) {
571                 final ReentrantLock lock = MultiLockFairBlockingQueue.this.locks[idx];
572                 lock.lock();
573                 try {
574                     boolean result = MultiLockFairBlockingQueue.this.items[idx].remove(elements[index]);
575                     if (result) break;
576                 } finally {
577                     lock.unlock();
578                 }
579             }
580
581         }
582
583     }
584 }