[SDNC-5] Rebase sdnc-core
[sdnc/core.git] / dblib / common / src / main / java / org / apache / tomcat / jdbc / pool / FairBlockingQueue.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * openecomp
4  * ================================================================================
5  * Copyright (C) 2016 - 2017 AT&T
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 /*
22  * Licensed to the Apache Software Foundation (ASF) under one or more
23  * contributor license agreements.  See the NOTICE file distributed with
24  * this work for additional information regarding copyright ownership.
25  * The ASF licenses this file to You under the Apache License, Version 2.0
26  * (the "License"); you may not use this file except in compliance with
27  * the License.  You may obtain a copy of the License at
28  *
29  *      http://www.apache.org/licenses/LICENSE-2.0
30  *
31  * Unless required by applicable law or agreed to in writing, software
32  * distributed under the License is distributed on an "AS IS" BASIS,
33  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
34  * See the License for the specific language governing permissions and
35  * limitations under the License.
36  */
37 package org.apache.tomcat.jdbc.pool;
38
39 import java.util.Collection;
40 import java.util.Iterator;
41 import java.util.LinkedList;
42 import java.util.NoSuchElementException;
43 import java.util.concurrent.BlockingQueue;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.ExecutionException;
46 import java.util.concurrent.Future;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.TimeoutException;
49 import java.util.concurrent.locks.ReentrantLock;
50
51 /**
52  *
53  * A simple implementation of a blocking queue with fairness waiting.
54  * invocations to method poll(...) will get handed out in the order they were received.
55  * Locking is fine grained, a shared lock is only used during the first level of contention, waiting is done in a
56  * lock per thread basis so that order is guaranteed once the thread goes into a suspended monitor state.
57  * <br>
58  * Not all of the methods of the {@link java.util.concurrent.BlockingQueue} are implemented.
59  *
60  * @param <E> Type of element in the queue
61  */
62
63 public class FairBlockingQueue<E> implements BlockingQueue<E> {
64
65     /**
66      * This little sucker is used to reorder the way to do
67      * {@link java.util.concurrent.locks.Lock#lock()},
68      * {@link java.util.concurrent.locks.Lock#unlock()}
69      * and
70      * {@link java.util.concurrent.CountDownLatch#countDown()}
71      * during the {@link #poll(long, TimeUnit)} operation.
72      * On Linux, it performs much better if we count down while we hold the global
73      * lock, on Solaris its the other way around.
74      * Until we have tested other platforms we only check for Linux.
75      */
76     static final boolean isLinux = "Linux".equals(System.getProperty("os.name")) &&
77                                    (!Boolean.getBoolean(FairBlockingQueue.class.getName()+".ignoreOS"));
78
79     /**
80      * Phase one entry lock in order to give out
81      * per-thread-locks for the waiting phase we have
82      * a phase one lock during the contention period.
83      */
84     final ReentrantLock lock = new ReentrantLock(false);
85
86     /**
87      * All the objects in the pool are stored in a simple linked list
88      */
89     final LinkedList<E> items;
90
91     /**
92      * All threads waiting for an object are stored in a linked list
93      */
94     final LinkedList<ExchangeCountDownLatch<E>> waiters;
95
96     /**
97      * Creates a new fair blocking queue.
98      */
99     public FairBlockingQueue() {
100         items = new LinkedList<>();
101         waiters = new LinkedList<>();
102     }
103
104     //------------------------------------------------------------------
105     // USED BY CONPOOL IMPLEMENTATION
106     //------------------------------------------------------------------
107     /**
108      * Will always return true, queue is unbounded.
109      * {@inheritDoc}
110      */
111     @Override
112     public boolean offer(E e) {
113         //during the offer, we will grab the main lock
114         final ReentrantLock lock = this.lock;
115         lock.lock();
116         ExchangeCountDownLatch<E> c = null;
117         try {
118             //check to see if threads are waiting for an object
119             if (waiters.size() > 0) {
120                 //if threads are waiting grab the latch for that thread
121                 c = waiters.poll();
122                 //give the object to the thread instead of adding it to the pool
123                 c.setItem(e);
124                 if (isLinux) c.countDown();
125             } else {
126                 //we always add first, so that the most recently used object will be given out
127                 items.addFirst(e);
128             }
129         } finally {
130             lock.unlock();
131         }
132         //if we exchanged an object with another thread, wake it up.
133         if (!isLinux && c!=null) c.countDown();
134         //we have an unbounded queue, so always return true
135         return true;
136     }
137
138     /**
139      * Will never timeout, as it invokes the {@link #offer(Object)} method.
140      * Once a lock has been acquired, the
141      * {@inheritDoc}
142      */
143     @Override
144     public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
145         return offer(e);
146     }
147
148     /**
149      * Fair retrieval of an object in the queue.
150      * Objects are returned in the order the threads requested them.
151      * {@inheritDoc}
152      */
153     @Override
154     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
155         E result = null;
156         final ReentrantLock lock = this.lock;
157         //acquire the global lock until we know what to do
158         lock.lock();
159         try {
160             //check to see if we have objects
161             result = items.poll();
162             if (result==null && timeout>0) {
163                 //the queue is empty we will wait for an object
164                 ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1);
165                 //add to the bottom of the wait list
166                 waiters.addLast(c);
167                 //unlock the global lock
168                 lock.unlock();
169                 boolean didtimeout = true;
170                 InterruptedException interruptedException = null;
171                 try {
172                     //wait for the specified timeout
173                     didtimeout = !c.await(timeout, unit);
174                 } catch (InterruptedException ix) {
175                     interruptedException = ix;
176                 }
177                 if (didtimeout) {
178                     //if we timed out, or got interrupted
179                     // remove ourselves from the waitlist
180                     lock.lock();
181                     try {
182                         waiters.remove(c);
183                     } finally {
184                         lock.unlock();
185                     }
186                 }
187                 //return the item we received, can be null if we timed out
188                 result = c.getItem();
189                 if (null!=interruptedException) {
190                     //we got interrupted
191                     if ( null!=result) {
192                         //we got a result - clear the interrupt status
193                         //don't propagate cause we have removed a connection from pool
194                         Thread.interrupted();
195                     } else {
196                         throw interruptedException;
197                     }
198                 }
199             } else {
200                 //we have an object, release
201                 lock.unlock();
202             }
203         } finally {
204             if (lock.isHeldByCurrentThread()) {
205                 lock.unlock();
206             }
207         }
208         return result;
209     }
210
211     /**
212      * Request an item from the queue asynchronously
213      * @return - a future pending the result from the queue poll request
214      */
215     public Future<E> pollAsync() {
216         Future<E> result = null;
217         final ReentrantLock lock = this.lock;
218         //grab the global lock
219         lock.lock();
220         try {
221             //check to see if we have objects in the queue
222             E item = items.poll();
223             if (item==null) {
224                 //queue is empty, add ourselves as waiters
225                 ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1);
226                 waiters.addLast(c);
227                 //return a future that will wait for the object
228                 result = new ItemFuture<>(c);
229             } else {
230                 //return a future with the item
231                 result = new ItemFuture<>(item);
232             }
233         } finally {
234             lock.unlock();
235         }
236         return result;
237     }
238
239     /**
240      * {@inheritDoc}
241      */
242     @Override
243     public boolean remove(Object e) {
244         final ReentrantLock lock = this.lock;
245         lock.lock();
246         try {
247             return items.remove(e);
248         } finally {
249             lock.unlock();
250         }
251     }
252
253     /**
254      * {@inheritDoc}
255      */
256     @Override
257     public int size() {
258         return items.size();
259     }
260
261     /**
262      * {@inheritDoc}
263      */
264     @Override
265     public Iterator<E> iterator() {
266         return new FairIterator();
267     }
268
269     /**
270      * {@inheritDoc}
271      */
272     @Override
273     public E poll() {
274         final ReentrantLock lock = this.lock;
275         lock.lock();
276         try {
277             return items.poll();
278         } finally {
279             lock.unlock();
280         }
281     }
282
283     /**
284      * {@inheritDoc}
285      */
286     @Override
287     public boolean contains(Object e) {
288         final ReentrantLock lock = this.lock;
289         lock.lock();
290         try {
291             return items.contains(e);
292         } finally {
293             lock.unlock();
294         }
295     }
296
297
298     //------------------------------------------------------------------
299     // NOT USED BY CONPOOL IMPLEMENTATION
300     //------------------------------------------------------------------
301     /**
302      * {@inheritDoc}
303      */
304     @Override
305     public boolean add(E e) {
306         return offer(e);
307     }
308
309     /**
310      * {@inheritDoc}
311      * @throws UnsupportedOperationException - this operation is not supported
312      */
313     @Override
314     public int drainTo(Collection<? super E> c, int maxElements) {
315         throw new UnsupportedOperationException("int drainTo(Collection<? super E> c, int maxElements)");
316     }
317
318     /**
319      * {@inheritDoc}
320      * @throws UnsupportedOperationException - this operation is not supported
321      */
322
323     @Override
324     public int drainTo(Collection<? super E> c) {
325         return drainTo(c,Integer.MAX_VALUE);
326     }
327
328     /**
329      * {@inheritDoc}
330      */
331     @Override
332     public void put(E e) throws InterruptedException {
333         offer(e);
334     }
335
336     /**
337      * {@inheritDoc}
338      */
339     @Override
340     public int remainingCapacity() {
341         return Integer.MAX_VALUE - size();
342     }
343
344     /**
345      * {@inheritDoc}
346      */
347     @Override
348     public E take() throws InterruptedException {
349         return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
350     }
351
352     /**
353      * {@inheritDoc}
354      */
355     @Override
356     public boolean addAll(Collection<? extends E> c) {
357         Iterator<? extends E> i = c.iterator();
358         while (i.hasNext()) {
359             E e = i.next();
360             offer(e);
361         }
362         return true;
363     }
364
365     /**
366      * {@inheritDoc}
367      * @throws UnsupportedOperationException - this operation is not supported
368      */
369     @Override
370     public void clear() {
371         throw new UnsupportedOperationException("void clear()");
372
373     }
374
375     /**
376      * {@inheritDoc}
377      * @throws UnsupportedOperationException - this operation is not supported
378      */
379     @Override
380     public boolean containsAll(Collection<?> c) {
381         throw new UnsupportedOperationException("boolean containsAll(Collection<?> c)");
382     }
383
384     /**
385      * {@inheritDoc}
386      */
387     @Override
388     public boolean isEmpty() {
389         return size() == 0;
390     }
391
392     /**
393      * {@inheritDoc}
394      * @throws UnsupportedOperationException - this operation is not supported
395      */
396     @Override
397     public boolean removeAll(Collection<?> c) {
398         throw new UnsupportedOperationException("boolean removeAll(Collection<?> c)");
399     }
400
401     /**
402      * {@inheritDoc}
403      * @throws UnsupportedOperationException - this operation is not supported
404      */
405     @Override
406     public boolean retainAll(Collection<?> c) {
407         throw new UnsupportedOperationException("boolean retainAll(Collection<?> c)");
408     }
409
410     /**
411      * {@inheritDoc}
412      * @throws UnsupportedOperationException - this operation is not supported
413      */
414     @Override
415     public Object[] toArray() {
416         throw new UnsupportedOperationException("Object[] toArray()");
417     }
418
419     /**
420      * {@inheritDoc}
421      * @throws UnsupportedOperationException - this operation is not supported
422      */
423     @Override
424     public <T> T[] toArray(T[] a) {
425         throw new UnsupportedOperationException("<T> T[] toArray(T[] a)");
426     }
427
428     /**
429      * {@inheritDoc}
430      * @throws UnsupportedOperationException - this operation is not supported
431      */
432     @Override
433     public E element() {
434         throw new UnsupportedOperationException("E element()");
435     }
436
437     /**
438      * {@inheritDoc}
439      * @throws UnsupportedOperationException - this operation is not supported
440      */
441     @Override
442     public E peek() {
443         throw new UnsupportedOperationException("E peek()");
444     }
445
446     /**
447      * {@inheritDoc}
448      * @throws UnsupportedOperationException - this operation is not supported
449      */
450     @Override
451     public E remove() {
452         throw new UnsupportedOperationException("E remove()");
453     }
454
455
456
457     //------------------------------------------------------------------
458     // Non cancellable Future used to check and see if a connection has been made available
459     //------------------------------------------------------------------
460     protected class ItemFuture<T> implements Future<T> {
461         protected volatile T item = null;
462         protected volatile ExchangeCountDownLatch<T> latch = null;
463         protected volatile boolean canceled = false;
464
465         public ItemFuture(T item) {
466             this.item = item;
467         }
468
469         public ItemFuture(ExchangeCountDownLatch<T> latch) {
470             this.latch = latch;
471         }
472
473         @Override
474         public boolean cancel(boolean mayInterruptIfRunning) {
475             return false; //don't allow cancel for now
476         }
477
478         @Override
479         public T get() throws InterruptedException, ExecutionException {
480             if (item!=null) {
481                 return item;
482             } else if (latch!=null) {
483                 latch.await();
484                 return latch.getItem();
485             } else {
486                 throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception());
487             }
488         }
489
490         @Override
491         public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
492             if (item!=null) {
493                 return item;
494             } else if (latch!=null) {
495                 boolean timedout = !latch.await(timeout, unit);
496                 if (timedout) throw new TimeoutException();
497                 else return latch.getItem();
498             } else {
499                 throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception());
500             }
501         }
502
503         @Override
504         public boolean isCancelled() {
505             return false;
506         }
507
508         @Override
509         public boolean isDone() {
510             return (item!=null || latch.getItem()!=null);
511         }
512
513     }
514
515     //------------------------------------------------------------------
516     // Count down latch that can be used to exchange information
517     //------------------------------------------------------------------
518     protected class ExchangeCountDownLatch<T> extends CountDownLatch {
519         protected volatile T item;
520         public ExchangeCountDownLatch(int i) {
521             super(i);
522         }
523         public T getItem() {
524             return item;
525         }
526         public void setItem(T item) {
527             this.item = item;
528         }
529     }
530
531     //------------------------------------------------------------------
532     // Iterator safe from concurrent modification exceptions
533     //------------------------------------------------------------------
534     protected class FairIterator implements Iterator<E> {
535         E[] elements = null;
536         int index;
537         E element = null;
538
539         @SuppressWarnings("unchecked") // Can't create arrays of generic types
540         public FairIterator() {
541             final ReentrantLock lock = FairBlockingQueue.this.lock;
542             lock.lock();
543             try {
544                 elements = (E[]) new Object[FairBlockingQueue.this.items.size()];
545                 FairBlockingQueue.this.items.toArray(elements);
546                 index = 0;
547             } finally {
548                 lock.unlock();
549             }
550         }
551         @Override
552         public boolean hasNext() {
553             return index<elements.length;
554         }
555
556         @Override
557         public E next() {
558             if (!hasNext()) {
559                 throw new NoSuchElementException();
560             }
561             element = elements[index++];
562             return element;
563         }
564
565         @Override
566         public void remove() {
567             final ReentrantLock lock = FairBlockingQueue.this.lock;
568             lock.lock();
569             try {
570                 if (element!=null) {
571                     FairBlockingQueue.this.items.remove(element);
572                 }
573             } finally {
574                 lock.unlock();
575             }
576         }
577
578     }
579 }