2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2016 - 2017 AT&T
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
22 * Licensed to the Apache Software Foundation (ASF) under one or more
23 * contributor license agreements. See the NOTICE file distributed with
24 * this work for additional information regarding copyright ownership.
25 * The ASF licenses this file to You under the Apache License, Version 2.0
26 * (the "License"); you may not use this file except in compliance with
27 * the License. You may obtain a copy of the License at
29 * http://www.apache.org/licenses/LICENSE-2.0
31 * Unless required by applicable law or agreed to in writing, software
32 * distributed under the License is distributed on an "AS IS" BASIS,
33 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
34 * See the License for the specific language governing permissions and
35 * limitations under the License.
37 package org.apache.tomcat.jdbc.pool;
39 import java.util.ArrayList;
40 import java.util.Collection;
41 import java.util.Iterator;
42 import java.util.LinkedList;
43 import java.util.NoSuchElementException;
44 import java.util.concurrent.BlockingQueue;
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.ExecutionException;
47 import java.util.concurrent.Future;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.TimeoutException;
50 import java.util.concurrent.atomic.AtomicInteger;
51 import java.util.concurrent.locks.ReentrantLock;
54 * <b>EXPERIMENTAL AND NOT YET COMPLETE!</b>
57 * An implementation of a blocking queue with fairness waiting and lock dispersal to avoid contention.
58 * invocations to method poll(...) will get handed out in the order they were received.
59 * Locking is fine grained, a shared lock is only used during the first level of contention, waiting is done in a
60 * lock per thread basis so that order is guaranteed once the thread goes into a suspended monitor state.
62 * Not all of the methods of the {@link java.util.concurrent.BlockingQueue} are implemented.
64 * @param <E> Type of element in the queue
67 public class MultiLockFairBlockingQueue<E> implements BlockingQueue<E> {
69 final int LOCK_COUNT = Runtime.getRuntime().availableProcessors();
71 final AtomicInteger putQueue = new AtomicInteger(0);
72 final AtomicInteger pollQueue = new AtomicInteger(0);
74 public int getNextPut() {
75 int idx = Math.abs(putQueue.incrementAndGet()) % LOCK_COUNT;
79 public int getNextPoll() {
80 int idx = Math.abs(pollQueue.incrementAndGet()) % LOCK_COUNT;
84 * Phase one entry lock in order to give out
85 * per-thread-locks for the waiting phase we have
86 * a phase one lock during the contention period.
88 private final ReentrantLock[] locks = new ReentrantLock[LOCK_COUNT];
91 * All the objects in the pool are stored in a simple linked list
93 final LinkedList<E>[] items;
96 * All threads waiting for an object are stored in a linked list
98 final LinkedList<ExchangeCountDownLatch<E>>[] waiters;
101 * Creates a new fair blocking queue.
103 @SuppressWarnings("unchecked") // Can create arrays of generic types
104 public MultiLockFairBlockingQueue() {
105 items = new LinkedList[LOCK_COUNT];
106 waiters = new LinkedList[LOCK_COUNT];
107 for (int i=0; i<LOCK_COUNT; i++) {
108 items[i] = new LinkedList<>();
109 waiters[i] = new LinkedList<>();
110 locks[i] = new ReentrantLock(false);
114 //------------------------------------------------------------------
115 // USED BY CONPOOL IMPLEMENTATION
116 //------------------------------------------------------------------
118 * Will always return true, queue is unbounded.
122 public boolean offer(E e) {
123 int idx = getNextPut();
124 //during the offer, we will grab the main lock
125 final ReentrantLock lock = this.locks[idx];
127 ExchangeCountDownLatch<E> c = null;
129 //check to see if threads are waiting for an object
130 if (waiters[idx].size() > 0) {
131 //if threads are waiting grab the latch for that thread
132 c = waiters[idx].poll();
133 //give the object to the thread instead of adding it to the pool
136 //we always add first, so that the most recently used object will be given out
137 items[idx].addFirst(e);
142 //if we exchanged an object with another thread, wake it up.
143 if (c!=null) c.countDown();
144 //we have an unbounded queue, so always return true
149 * Will never timeout, as it invokes the {@link #offer(Object)} method.
150 * Once a lock has been acquired, the
154 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
159 * Fair retrieval of an object in the queue.
160 * Objects are returned in the order the threads requested them.
164 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
165 int idx = getNextPoll();
167 final ReentrantLock lock = this.locks[idx];
169 //acquire the global lock until we know what to do
171 //check to see if we have objects
172 result = items[idx].poll();
173 if (result==null && timeout>0) {
174 //the queue is empty we will wait for an object
175 ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1);
176 //add to the bottom of the wait list
177 waiters[idx].addLast(c);
178 //unlock the global lock
180 //wait for the specified timeout
181 if (!c.await(timeout, unit)) {
182 //if we timed out, remove ourselves from the waitlist
184 waiters[idx].remove(c);
187 //return the item we received, can be null if we timed out
188 result = c.getItem();
190 //we have an object, release
194 if (lock.isHeldByCurrentThread()) {
202 * Request an item from the queue asynchronously
203 * @return - a future pending the result from the queue poll request
205 public Future<E> pollAsync() {
206 int idx = getNextPoll();
207 Future<E> result = null;
208 final ReentrantLock lock = this.locks[idx];
210 //grab the global lock
212 //check to see if we have objects in the queue
213 E item = items[idx].poll();
215 //queue is empty, add ourselves as waiters
216 ExchangeCountDownLatch<E> c = new ExchangeCountDownLatch<>(1);
217 waiters[idx].addLast(c);
218 //return a future that will wait for the object
219 result = new ItemFuture<>(c);
221 //return a future with the item
222 result = new ItemFuture<>(item);
234 public boolean remove(Object e) {
235 for (int idx=0; idx<LOCK_COUNT; idx++) {
236 final ReentrantLock lock = this.locks[idx];
239 boolean result = items[idx].remove(e);
240 if (result) return result;
254 for (int idx=0; idx<LOCK_COUNT; idx++) {
255 size += items[idx].size();
264 public Iterator<E> iterator() {
265 return new FairIterator();
273 int idx = getNextPoll();
274 final ReentrantLock lock = this.locks[idx];
277 return items[idx].poll();
287 public boolean contains(Object e) {
288 for (int idx=0; idx<LOCK_COUNT; idx++) {
289 boolean result = items[idx].contains(e);
290 if (result) return result;
296 //------------------------------------------------------------------
297 // NOT USED BY CONPOOL IMPLEMENTATION
298 //------------------------------------------------------------------
303 public boolean add(E e) {
309 * @throws UnsupportedOperationException - this operation is not supported
312 public int drainTo(Collection<? super E> c, int maxElements) {
313 throw new UnsupportedOperationException("int drainTo(Collection<? super E> c, int maxElements)");
318 * @throws UnsupportedOperationException - this operation is not supported
321 public int drainTo(Collection<? super E> c) {
322 return drainTo(c,Integer.MAX_VALUE);
329 public void put(E e) throws InterruptedException {
337 public int remainingCapacity() {
338 return Integer.MAX_VALUE - size();
345 public E take() throws InterruptedException {
346 return this.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
353 public boolean addAll(Collection<? extends E> c) {
354 Iterator<? extends E> i = c.iterator();
355 while (i.hasNext()) {
364 * @throws UnsupportedOperationException - this operation is not supported
367 public void clear() {
368 throw new UnsupportedOperationException("void clear()");
374 * @throws UnsupportedOperationException - this operation is not supported
377 public boolean containsAll(Collection<?> c) {
378 throw new UnsupportedOperationException("boolean containsAll(Collection<?> c)");
385 public boolean isEmpty() {
391 * @throws UnsupportedOperationException - this operation is not supported
394 public boolean removeAll(Collection<?> c) {
395 throw new UnsupportedOperationException("boolean removeAll(Collection<?> c)");
400 * @throws UnsupportedOperationException - this operation is not supported
403 public boolean retainAll(Collection<?> c) {
404 throw new UnsupportedOperationException("boolean retainAll(Collection<?> c)");
409 * @throws UnsupportedOperationException - this operation is not supported
412 public Object[] toArray() {
413 throw new UnsupportedOperationException("Object[] toArray()");
418 * @throws UnsupportedOperationException - this operation is not supported
421 public <T> T[] toArray(T[] a) {
422 throw new UnsupportedOperationException("<T> T[] toArray(T[] a)");
427 * @throws UnsupportedOperationException - this operation is not supported
431 throw new UnsupportedOperationException("E element()");
436 * @throws UnsupportedOperationException - this operation is not supported
440 throw new UnsupportedOperationException("E peek()");
445 * @throws UnsupportedOperationException - this operation is not supported
449 throw new UnsupportedOperationException("E remove()");
454 //------------------------------------------------------------------
455 // Non cancellable Future used to check and see if a connection has been made available
456 //------------------------------------------------------------------
457 protected class ItemFuture<T> implements Future<T> {
458 protected volatile T item = null;
459 protected volatile ExchangeCountDownLatch<T> latch = null;
460 protected volatile boolean canceled = false;
462 public ItemFuture(T item) {
466 public ItemFuture(ExchangeCountDownLatch<T> latch) {
471 public boolean cancel(boolean mayInterruptIfRunning) {
472 return false; //don't allow cancel for now
476 public T get() throws InterruptedException, ExecutionException {
479 } else if (latch!=null) {
481 return latch.getItem();
483 throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception());
488 public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
491 } else if (latch!=null) {
492 boolean timedout = !latch.await(timeout, unit);
493 if (timedout) throw new TimeoutException();
494 else return latch.getItem();
496 throw new ExecutionException("ItemFuture incorrectly instantiated. Bug in the code?", new Exception());
501 public boolean isCancelled() {
506 public boolean isDone() {
507 return (item!=null || latch.getItem()!=null);
512 //------------------------------------------------------------------
513 // Count down latch that can be used to exchange information
514 //------------------------------------------------------------------
515 protected class ExchangeCountDownLatch<T> extends CountDownLatch {
516 protected volatile T item;
517 public ExchangeCountDownLatch(int i) {
523 public void setItem(T item) {
528 //------------------------------------------------------------------
529 // Iterator safe from concurrent modification exceptions
530 //------------------------------------------------------------------
531 protected class FairIterator implements Iterator<E> {
536 @SuppressWarnings("unchecked") // Can't create arrays of generic types
537 public FairIterator() {
538 ArrayList<E> list = new ArrayList<>(MultiLockFairBlockingQueue.this.size());
539 for (int idx=0; idx<LOCK_COUNT; idx++) {
540 final ReentrantLock lock = MultiLockFairBlockingQueue.this.locks[idx];
543 elements = (E[]) new Object[MultiLockFairBlockingQueue.this.items[idx].size()];
544 MultiLockFairBlockingQueue.this.items[idx].toArray(elements);
551 elements = (E[]) new Object[list.size()];
552 list.toArray(elements);
555 public boolean hasNext() {
556 return index<elements.length;
562 throw new NoSuchElementException();
564 element = elements[index++];
569 public void remove() {
570 for (int idx=0; idx<LOCK_COUNT; idx++) {
571 final ReentrantLock lock = MultiLockFairBlockingQueue.this.locks[idx];
574 boolean result = MultiLockFairBlockingQueue.this.items[idx].remove(elements[index]);