2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2016 - 2017 AT&T
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
22 * Licensed to the Apache Software Foundation (ASF) under one or more
23 * contributor license agreements. See the NOTICE file distributed with
24 * this work for additional information regarding copyright ownership.
25 * The ASF licenses this file to You under the Apache License, Version 2.0
26 * (the "License"); you may not use this file except in compliance with
27 * the License. You may obtain a copy of the License at
29 * http://www.apache.org/licenses/LICENSE-2.0
31 * Unless required by applicable law or agreed to in writing, software
32 * distributed under the License is distributed on an "AS IS" BASIS,
33 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
34 * See the License for the specific language governing permissions and
35 * limitations under the License.
37 package org.apache.tomcat.jdbc.pool;
39 import java.util.Collection;
40 import java.util.Iterator;
41 import java.util.LinkedList;
42 import java.util.NoSuchElementException;
43 import java.util.concurrent.BlockingQueue;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.ExecutionException;
46 import java.util.concurrent.Future;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.TimeoutException;
49 import java.util.concurrent.locks.ReentrantLock;
53 * A simple implementation of a blocking queue with fairness waiting.
54 * invocations to method poll(...) will get handed out in the order they were received.
55 * Locking is fine grained, a shared lock is only used during the first level of contention, waiting is done in a
56 * lock per thread basis so that order is guaranteed once the thread goes into a suspended monitor state.
58 * Not all of the methods of the {@link java.util.concurrent.BlockingQueue} are implemented.
60 * @param <E> Type of element in the queue
63 public class FairBlockingQueue<E> implements BlockingQueue<E> {
66 * This little sucker is used to reorder the way to do
67 * {@link java.util.concurrent.locks.Lock#lock()},
68 * {@link java.util.concurrent.locks.Lock#unlock()}
70 * {@link java.util.concurrent.CountDownLatch#countDown()}
71 * during the {@link #poll(long, TimeUnit)} operation.
72 * On Linux, it performs much better if we count down while we hold the global
73 * lock, on Solaris its the other way around.
74 * Until we have tested other platforms we only check for Linux.
76 static final boolean isLinux = "Linux".equals(System.getProperty("os.name")) &&
77 (!Boolean.getBoolean(FairBlockingQueue.class.getName()+".ignoreOS"));
80 * Phase one entry lock in order to give out
81 * per-thread-locks for the waiting phase we have
82 * a phase one lock during the contention period.
84 final ReentrantLock lock = new ReentrantLock(false);
87 * All the objects in the pool are stored in a simple linked list
89 final LinkedList<E> items;
92 * All threads waiting for an object are stored in a linked list
94 final LinkedList<ExchangeCountDownLatch<E>> waiters;
97 * Creates a new fair blocking queue.
99 public FairBlockingQueue() {
100 items = new LinkedList<>();
101 waiters = new LinkedList<>();
104 //------------------------------------------------------------------
105 // USED BY CONPOOL IMPLEMENTATION
106 //------------------------------------------------------------------
108 * Will always return true, queue is unbounded.
112 public boolean offer(E e) {
113 //during the offer, we will grab the main lock
114 final ReentrantLock lock = this.lock;
116 ExchangeCountDownLatch<E> c = null;
118 //check to see if threads are waiting for an object
119 if (waiters.size() > 0) {
120 //if threads are waiting grab the latch for that thread
122 //give the object to the thread instead of adding it to the pool
124 if (isLinux) c.countDown();
126 //we always add first, so that the most recently used object will be given out
132 //if we exchanged an object with another thread, wake it up.
133 if (!isLinux && c!=null) c.countDown();
134 //we have an unbounded queue, so always return true
139 * Will never timeout, as it invokes the {@link #offer(Object)} method.
140 * Once a lock has been acquired, the
144 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
149 * Fair retrieval of an object in the queue.
150 * Objects are returned in the order the threads requested them.
154 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
156 final ReentrantLock lock = this.lock;
157 //acquire the global lock until we know what to do
160 //check to see if we have objects
161 result = items.poll();
162 if (result==null && timeout>0) {
163 //the queue is empty we will wait for an object
164 ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1);
165 //add to the bottom of the wait list
167 //unlock the global lock
169 boolean didtimeout = true;
170 InterruptedException interruptedException = null;
172 //wait for the specified timeout
173 didtimeout = !c.await(timeout, unit);
174 } catch (InterruptedException ix) {
175 interruptedException = ix;
178 //if we timed out, or got interrupted
179 // remove ourselves from the waitlist
187 //return the item we received, can be null if we timed out
188 result = c.getItem();
189 if (null!=interruptedException) {
192 //we got a result - clear the interrupt status
193 //don't propagate cause we have removed a connection from pool
194 Thread.interrupted();
196 throw interruptedException;
200 //we have an object, release
204 if (lock.isHeldByCurrentThread()) {
212 * Request an item from the queue asynchronously
213 * @return - a future pending the result from the queue poll request
215 public Future<E> pollAsync() {
216 Future<E> result = null;
217 final ReentrantLock lock = this.lock;
218 //grab the global lock
221 //check to see if we have objects in the queue
222 E item = items.poll();
224 //queue is empty, add ourselves as waiters
225 ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1);
227 //return a future that will wait for the object
228 result = new ItemFuture<>(c);
230 //return a future with the item
231 result = new ItemFuture<>(item);
243 public boolean remove(Object e) {
244 final ReentrantLock lock = this.lock;
247 return items.remove(e);
265 public Iterator<E> iterator() {
266 return new FairIterator();
274 final ReentrantLock lock = this.lock;
287 public boolean contains(Object e) {
288 final ReentrantLock lock = this.lock;
291 return items.contains(e);
298 //------------------------------------------------------------------
299 // NOT USED BY CONPOOL IMPLEMENTATION
300 //------------------------------------------------------------------
305 public boolean add(E e) {
311 * @throws UnsupportedOperationException - this operation is not supported
314 public int drainTo(Collection<? super E> c, int maxElements) {
315 throw new UnsupportedOperationException("int drainTo(Collection<? super E> c, int maxElements)");
320 * @throws UnsupportedOperationException - this operation is not supported
324 public int drainTo(Collection<? super E> c) {
325 return drainTo(c,Integer.MAX_VALUE);
332 public void put(E e) throws InterruptedException {
340 public int remainingCapacity() {
341 return Integer.MAX_VALUE - size();
348 public E take() throws InterruptedException {
349 return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
356 public boolean addAll(Collection<? extends E> c) {
357 Iterator<? extends E> i = c.iterator();
358 while (i.hasNext()) {
367 * @throws UnsupportedOperationException - this operation is not supported
370 public void clear() {
371 throw new UnsupportedOperationException("void clear()");
377 * @throws UnsupportedOperationException - this operation is not supported
380 public boolean containsAll(Collection<?> c) {
381 throw new UnsupportedOperationException("boolean containsAll(Collection<?> c)");
388 public boolean isEmpty() {
394 * @throws UnsupportedOperationException - this operation is not supported
397 public boolean removeAll(Collection<?> c) {
398 throw new UnsupportedOperationException("boolean removeAll(Collection<?> c)");
403 * @throws UnsupportedOperationException - this operation is not supported
406 public boolean retainAll(Collection<?> c) {
407 throw new UnsupportedOperationException("boolean retainAll(Collection<?> c)");
412 * @throws UnsupportedOperationException - this operation is not supported
415 public Object[] toArray() {
416 throw new UnsupportedOperationException("Object[] toArray()");
421 * @throws UnsupportedOperationException - this operation is not supported
424 public <T> T[] toArray(T[] a) {
425 throw new UnsupportedOperationException("<T> T[] toArray(T[] a)");
430 * @throws UnsupportedOperationException - this operation is not supported
434 throw new UnsupportedOperationException("E element()");
439 * @throws UnsupportedOperationException - this operation is not supported
443 throw new UnsupportedOperationException("E peek()");
448 * @throws UnsupportedOperationException - this operation is not supported
452 throw new UnsupportedOperationException("E remove()");
457 //------------------------------------------------------------------
458 // Non cancellable Future used to check and see if a connection has been made available
459 //------------------------------------------------------------------
460 protected class ItemFuture<T> implements Future<T> {
461 protected volatile T item = null;
462 protected volatile ExchangeCountDownLatch<T> latch = null;
463 protected volatile boolean canceled = false;
465 public ItemFuture(T item) {
469 public ItemFuture(ExchangeCountDownLatch<T> latch) {
474 public boolean cancel(boolean mayInterruptIfRunning) {
475 return false; //don't allow cancel for now
479 public T get() throws InterruptedException, ExecutionException {
482 } else if (latch!=null) {
484 return latch.getItem();
486 throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception());
491 public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
494 } else if (latch!=null) {
495 boolean timedout = !latch.await(timeout, unit);
496 if (timedout) throw new TimeoutException();
497 else return latch.getItem();
499 throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception());
504 public boolean isCancelled() {
509 public boolean isDone() {
510 return (item!=null || latch.getItem()!=null);
515 //------------------------------------------------------------------
516 // Count down latch that can be used to exchange information
517 //------------------------------------------------------------------
518 protected class ExchangeCountDownLatch<T> extends CountDownLatch {
519 protected volatile T item;
520 public ExchangeCountDownLatch(int i) {
526 public void setItem(T item) {
531 //------------------------------------------------------------------
532 // Iterator safe from concurrent modification exceptions
533 //------------------------------------------------------------------
534 protected class FairIterator implements Iterator<E> {
539 @SuppressWarnings("unchecked") // Can't create arrays of generic types
540 public FairIterator() {
541 final ReentrantLock lock = FairBlockingQueue.this.lock;
544 elements = (E[]) new Object[FairBlockingQueue.this.items.size()];
545 FairBlockingQueue.this.items.toArray(elements);
552 public boolean hasNext() {
553 return index<elements.length;
559 throw new NoSuchElementException();
561 element = elements[index++];
566 public void remove() {
567 final ReentrantLock lock = FairBlockingQueue.this.lock;
571 FairBlockingQueue.this.items.remove(element);