2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2016 - 2017 ONAP
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.ccsdk.sli.core.dblib;
23 import java.io.PrintWriter;
24 import java.sql.Connection;
25 import java.sql.SQLDataException;
26 import java.sql.SQLException;
27 import java.sql.SQLFeatureNotSupportedException;
28 import java.sql.SQLIntegrityConstraintViolationException;
29 import java.sql.SQLSyntaxErrorException;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.Comparator;
33 import java.util.HashSet;
34 import java.util.Iterator;
35 import java.util.LinkedList;
36 import java.util.Observable;
37 import java.util.PriorityQueue;
38 import java.util.Properties;
39 import java.util.Queue;
41 import java.util.Timer;
42 import java.util.TimerTask;
43 import java.util.concurrent.ConcurrentLinkedQueue;
44 import java.util.concurrent.atomic.AtomicBoolean;
46 import javax.sql.DataSource;
47 import javax.sql.rowset.CachedRowSet;
49 import org.apache.tomcat.jdbc.pool.PoolExhaustedException;
50 import org.onap.ccsdk.sli.core.dblib.config.DbConfigPool;
51 import org.onap.ccsdk.sli.core.dblib.factory.AbstractDBResourceManagerFactory;
52 import org.onap.ccsdk.sli.core.dblib.factory.AbstractResourceManagerFactory;
53 import org.onap.ccsdk.sli.core.dblib.factory.DBConfigFactory;
54 import org.onap.ccsdk.sli.core.dblib.pm.PollingWorker;
55 import org.onap.ccsdk.sli.core.dblib.pm.SQLExecutionMonitor;
57 import com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
63 * @version $Revision: 1.15 $
65 * Author Date Comments
66 * ============== ======== ====================================================
69 public class DBResourceManager implements DataSource, DataAccessor, DBResourceObserver, DbLibService {
70 private static Logger LOGGER = LoggerFactory.getLogger(DBResourceManager.class);
72 transient boolean terminating = false;
73 transient protected long retryInterval = 10000L;
74 transient boolean recoveryMode = true;
76 protected final AtomicBoolean dsSelector = new AtomicBoolean();
78 // Queue<CachedDataSource> dsQueue = new ConcurrentLinkedQueue<CachedDataSource>();
79 Queue<CachedDataSource> dsQueue = new PriorityQueue<CachedDataSource>(4, new Comparator<CachedDataSource>(){
82 public int compare(CachedDataSource left, CachedDataSource right) {
89 } catch (Throwable e) {
96 protected final Set<CachedDataSource> broken = Collections.synchronizedSet(new HashSet<CachedDataSource>());
97 protected final Object monitor = new Object();
98 protected final Properties configProps;
99 protected final Thread worker;
101 protected final long terminationTimeOut;
102 protected final boolean monitorDbResponse;
103 protected final long monitoringInterval;
104 protected final long monitoringInitialDelay;
105 protected final long expectedCompletionTime;
106 protected final long unprocessedFailoverThreshold;
108 public DBResourceManager(Properties props){
109 this.configProps = props;
111 // get retry interval value
112 retryInterval = getLongFromProperties(props, "org.onap.dblib.connection.retry", 10000L);
114 // get recovery mode flag
115 recoveryMode = getBooleanFromProperties(props, "org.onap.dblib.connection.recovery", true);
118 recoveryMode = false;
119 LOGGER.info("Recovery Mode disabled");
121 // get time out value for thread cleanup
122 terminationTimeOut = getLongFromProperties(props, "org.onap.dblib.termination.timeout", 300000L);
123 // get properties for monitoring
124 monitorDbResponse = getBooleanFromProperties(props, "org.onap.dblib.connection.monitor", false);
125 monitoringInterval = getLongFromProperties(props, "org.onap.dblib.connection.monitor.interval", 1000L);
126 monitoringInitialDelay = getLongFromProperties(props, "org.onap.dblib.connection.monitor.startdelay", 5000L);
127 expectedCompletionTime = getLongFromProperties(props, "org.onap.dblib.connection.monitor.expectedcompletiontime", 5000L);
128 unprocessedFailoverThreshold = getLongFromProperties(props, "org.onap.dblib.connection.monitor.unprocessedfailoverthreshold", 3L);
130 // initialize performance monitor
131 PollingWorker.createInistance(props);
133 // initialize recovery thread
134 worker = new RecoveryMgr();
135 worker.setName("DBResourcemanagerWatchThread");
136 worker.setDaemon(true);
140 private void config(Properties ctx) throws Exception {
142 DbConfigPool dbConfig = DBConfigFactory.createConfig(this.configProps);
145 AbstractResourceManagerFactory factory = AbstractDBResourceManagerFactory.getFactory(dbConfig.getType());
146 if(LOGGER.isInfoEnabled()){
147 LOGGER.info("Default DB config is : " + dbConfig.getType());
148 LOGGER.info("Using factory : " + factory.getClass().getName());
150 CachedDataSource[] cachedDS = factory.initDBResourceManager(dbConfig, this);
151 if(cachedDS == null || cachedDS.length == 0) {
152 LOGGER.error("Initialization of CachedDataSources failed. No instance was created.");
153 throw new Exception("Failed to initialize DB Library. No data source was created.");
156 for(int i=0; i<cachedDS.length; i++){
157 if(cachedDS[i] != null && cachedDS[i].isInitialized()){
158 setDataSource(cachedDS[i]);
159 cachedDS[i].setInterval(monitoringInterval);
160 cachedDS[i].setInitialDelay(monitoringInitialDelay);
161 cachedDS[i].setExpectedCompletionTime(expectedCompletionTime);
162 cachedDS[i].setUnprocessedFailoverThreshold(unprocessedFailoverThreshold);
163 cachedDS[i].addObserver(this);
167 } catch(Exception exc){
172 private long getLongFromProperties(Properties props, String property, long defaultValue)
175 long tmpLongValue = defaultValue;
177 value = (String)props.getProperty(property);
179 tmpLongValue = Long.parseLong(value);
181 } catch(NumberFormatException exc) {
182 if(LOGGER.isWarnEnabled()){
183 LOGGER.warn("'"+property+"'=" + value+" is invalid. It should be a numeric value");
185 } catch(Exception exc) {
191 private boolean getBooleanFromProperties(Properties props, String property, boolean defaultValue)
193 boolean tmpValue = defaultValue;
197 value = (String)props.getProperty(property);
199 tmpValue = Boolean.parseBoolean(value);
201 } catch(NumberFormatException exc) {
202 if(LOGGER.isWarnEnabled()){
203 LOGGER.warn("'"+property+"'=" + value+" is invalid. It should be a boolean value");
205 } catch(Exception exc) {
212 public void update(Observable observable, Object data) {
213 // if observable is active and there is a standby available, switch
214 if(observable instanceof SQLExecutionMonitor)
216 SQLExecutionMonitor monitor = (SQLExecutionMonitor)observable;
217 if(monitor.getParent() instanceof CachedDataSource)
219 CachedDataSource dataSource = (CachedDataSource)monitor.getParent();
220 if(dataSource == dsQueue.peek())
222 if(recoveryMode && dsQueue.size() > 1){
223 handleGetConnectionException(dataSource, new Exception(data.toString()));
230 public void testForceRecovery()
232 CachedDataSource active = (CachedDataSource) this.dsQueue.peek();
233 handleGetConnectionException(active, new Exception("test"));
236 class RecoveryMgr extends Thread {
242 Thread.sleep(retryInterval);
243 } catch (InterruptedException e1) { }
244 CachedDataSource brokenSource = null;
246 if (!broken.isEmpty()) {
247 CachedDataSource[] sourceArray = broken.toArray(new CachedDataSource[0]);
248 for (int i = 0; i < sourceArray.length; i++)
250 brokenSource = sourceArray[i];
251 if (brokenSource instanceof TerminatingCachedDataSource)
253 if (resetConnectionPool(brokenSource)) {
254 broken.remove(brokenSource);
255 brokenSource.blockImmediateOffLine();
256 dsQueue.add(brokenSource);
257 LOGGER.info("DataSource <"
258 + brokenSource.getDbConnectionName()
264 } catch (Exception exc) {
265 LOGGER.warn(exc.getMessage());
266 if(brokenSource != null){
268 if(!broken.contains(brokenSource))
269 broken.add(brokenSource);
271 } catch (Exception e1) { }
275 LOGGER.info("DBResourceManager.RecoveryMgr <"+this.toString() +"> terminated." );
278 private boolean resetConnectionPool(CachedDataSource dataSource){
280 return dataSource.testConnection();
281 } catch (Exception exc) {
282 LOGGER.info("DataSource <" + dataSource.getDbConnectionName() + "> resetCache failed with error: "+ exc.getMessage());
289 * @see org.onap.ccsdk.sli.resource.dblib.DbLibService#getData(java.lang.String, java.util.ArrayList, java.lang.String)
292 public CachedRowSet getData(String statement, ArrayList<String> arguments, String preferredDS) throws SQLException {
293 ArrayList<Object> newList=new ArrayList<Object>();
294 if(arguments != null && !arguments.isEmpty()) {
295 newList.addAll(arguments);
298 return requestDataWithRecovery(statement, newList, preferredDS);
300 return requestDataNoRecovery(statement, newList, preferredDS);
303 private CachedRowSet requestDataWithRecovery(String statement, ArrayList<Object> arguments, String preferredDS) throws SQLException {
304 Throwable lastException = null;
305 CachedDataSource active = null;
307 // test if there are any connection pools available
308 LinkedList<CachedDataSource> sources = new LinkedList<CachedDataSource>(this.dsQueue);
309 if(sources.isEmpty()){
310 LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available.");
311 throw new DBLibException("No active DB connection pools are available in RequestDataWithRecovery call.");
313 if(preferredDS != null && !sources.peek().getDbConnectionName().equals(preferredDS)) {
314 Collections.reverse(sources);
318 // loop through available data sources to retrieve data.
319 while(!sources.isEmpty())
321 active = sources.peek();
323 long time = System.currentTimeMillis();
325 if(!active.isFabric()) {
326 CachedDataSource master = findMaster();
332 sources.remove(active);
333 return active.getData(statement, arguments);
334 } catch(SQLDataException exc){
336 } catch(SQLSyntaxErrorException exc){
338 } catch(SQLIntegrityConstraintViolationException exc){
340 } catch(Throwable exc){
342 String message = exc.getMessage();
343 if(message == null) {
344 if(exc.getCause() != null) {
345 message = exc.getCause().getMessage();
348 message = exc.getClass().getName();
350 LOGGER.error("Generated alarm: "+active.getDbConnectionName()+" - "+message);
351 handleGetConnectionException(active, exc);
353 if(LOGGER.isDebugEnabled()){
354 time = (System.currentTimeMillis() - time);
355 LOGGER.debug("getData processing time : "+ active.getDbConnectionName()+" "+time+" miliseconds.");
359 if(lastException instanceof SQLException){
360 throw (SQLException)lastException;
362 // repackage the exception
363 // you are here because either you run out of available data sources
364 // or the last exception was not of SQLException type.
365 // repackage the exception
366 if(lastException == null) {
367 throw new DBLibException("The operation timed out while waiting to acquire a new connection." );
369 SQLException exception = new DBLibException(lastException.getMessage());
370 exception.setStackTrace(lastException.getStackTrace());
371 if(lastException.getCause() instanceof SQLException) {
372 throw (SQLException)lastException.getCause();
378 private CachedRowSet requestDataNoRecovery(String statement, ArrayList<Object> arguments, String preferredDS) throws SQLException {
379 if(dsQueue.isEmpty()){
380 LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available.");
381 throw new DBLibException("No active DB connection pools are available in RequestDataNoRecovery call.");
383 CachedDataSource active = (CachedDataSource) this.dsQueue.peek();
384 long time = System.currentTimeMillis();
386 if(!active.isFabric()) {
387 CachedDataSource master = findMaster();
391 return active.getData(statement, arguments);
392 // } catch(SQLDataException exc){
394 } catch(Throwable exc){
395 String message = exc.getMessage();
397 message = exc.getClass().getName();
398 LOGGER.error("Generated alarm: "+active.getDbConnectionName()+" - "+message);
399 if(exc instanceof SQLException)
400 throw (SQLException)exc;
402 DBLibException excptn = new DBLibException(exc.getMessage());
403 excptn.setStackTrace(exc.getStackTrace());
407 if(LOGGER.isDebugEnabled()){
408 time = (System.currentTimeMillis() - time);
409 LOGGER.debug(">> getData : "+ active.getDbConnectionName()+" "+time+" miliseconds.");
416 * @see org.onap.ccsdk.sli.resource.dblib.DbLibService#writeData(java.lang.String, java.util.ArrayList, java.lang.String)
419 public boolean writeData(String statement, ArrayList<String> arguments, String preferredDS) throws SQLException
421 ArrayList<Object> newList=new ArrayList<Object>();
422 if(arguments != null && !arguments.isEmpty()) {
423 newList.addAll(arguments);
426 return writeDataNoRecovery(statement, newList, preferredDS);
429 CachedDataSource findMaster() throws PoolExhaustedException, MySQLNonTransientConnectionException {
430 CachedDataSource master = null;
431 CachedDataSource[] dss = this.dsQueue.toArray(new CachedDataSource[0]);
432 for(int i=0; i<dss.length; i++) {
433 if(!dss[i].isSlave()) {
436 dsQueue.remove(master);
443 LOGGER.warn("MASTER not found.");
449 private boolean writeDataNoRecovery(String statement, ArrayList<Object> arguments, String preferredDS) throws SQLException {
450 if(dsQueue.isEmpty()){
451 LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available.");
452 throw new DBLibException("No active DB connection pools are available in RequestDataNoRecovery call.");
455 boolean initialRequest = true;
456 boolean retryAllowed = true;
457 CachedDataSource active = (CachedDataSource) this.dsQueue.peek();
458 long time = System.currentTimeMillis();
459 while(initialRequest) {
460 initialRequest = false;
462 if(!active.isFabric()) {
463 CachedDataSource master = findMaster();
469 return active.writeData(statement, arguments);
470 } catch(Throwable exc){
471 String message = exc.getMessage();
473 message = exc.getClass().getName();
474 LOGGER.error("Generated alarm: "+active.getDbConnectionName()+" - "+message);
475 if(exc instanceof SQLException) {
476 SQLException sqlExc = SQLException.class.cast(exc);
477 // handle read-only exception
478 if(sqlExc.getErrorCode() == 1290 && "HY000".equals(sqlExc.getSQLState())) {
479 LOGGER.warn("retrying due to: " + sqlExc.getMessage());
480 dsQueue.remove(active);
483 retryAllowed = false;
484 initialRequest = true;
488 throw (SQLException)exc;
490 DBLibException excptn = new DBLibException(exc.getMessage());
491 excptn.setStackTrace(exc.getStackTrace());
495 if(LOGGER.isDebugEnabled()){
496 time = (System.currentTimeMillis() - time);
497 LOGGER.debug("writeData processing time : "+ active.getDbConnectionName()+" "+time+" miliseconds.");
504 private void setDataSource(CachedDataSource dataSource) {
505 if(dataSource.testConnection(true)){
506 this.dsQueue.add(dataSource);
508 this.broken.add(dataSource);
512 public Connection getConnection() throws SQLException {
513 Throwable lastException = null;
514 CachedDataSource active = null;
516 if(dsQueue.isEmpty()){
517 throw new DBLibException("No active DB connection pools are available in GetConnection call.");
521 active = dsQueue.peek();
522 CachedDataSource tmpActive = findMaster();
523 if(tmpActive != null) {
526 return new DBLibConnection(active.getConnection(), active);
527 } catch(javax.sql.rowset.spi.SyncFactoryException exc){
528 LOGGER.debug("Free memory (bytes): " + Runtime.getRuntime().freeMemory());
529 LOGGER.warn("CLASSPATH issue. Allowing retry", exc);
531 } catch(PoolExhaustedException exc) {
532 throw new NoAvailableConnectionsException(exc);
533 } catch(MySQLNonTransientConnectionException exc){
534 throw new NoAvailableConnectionsException(exc);
535 } catch(Exception exc){
538 handleGetConnectionException(active, exc);
540 if(exc instanceof MySQLNonTransientConnectionException) {
541 throw new NoAvailableConnectionsException(exc);
542 } if(exc instanceof SQLException) {
543 throw (SQLException)exc;
545 DBLibException excptn = new DBLibException(exc.getMessage());
546 excptn.setStackTrace(exc.getStackTrace());
550 } catch (Throwable trwb) {
551 DBLibException excptn = new DBLibException(trwb.getMessage());
552 excptn.setStackTrace(trwb.getStackTrace());
555 if(LOGGER.isDebugEnabled()){
560 if(lastException instanceof SQLException){
561 throw (SQLException)lastException;
563 // repackage the exception
564 if(lastException == null) {
565 throw new DBLibException("The operation timed out while waiting to acquire a new connection." );
567 SQLException exception = new DBLibException(lastException.getMessage());
568 exception.setStackTrace(lastException.getStackTrace());
569 if(lastException.getCause() instanceof SQLException) {
570 // exception.setNextException((SQLException)lastException.getCause());
571 throw (SQLException)lastException.getCause();
577 public Connection getConnection(String username, String password)
578 throws SQLException {
579 CachedDataSource active = null;
581 if(dsQueue.isEmpty()){
582 throw new DBLibException("No active DB connection pools are available in GetConnection call.");
587 active = dsQueue.peek();
588 CachedDataSource tmpActive = findMaster();
589 if(tmpActive != null) {
592 return active.getConnection(username, password);
593 } catch(Throwable exc){
595 handleGetConnectionException(active, exc);
597 if(exc instanceof SQLException)
598 throw (SQLException)exc;
600 DBLibException excptn = new DBLibException(exc.getMessage());
601 excptn.setStackTrace(exc.getStackTrace());
608 throw new DBLibException("No connections available in DBResourceManager in GetConnection call.");
611 private void handleGetConnectionException(CachedDataSource source, Throwable exc) {
613 if(!source.canTakeOffLine())
615 LOGGER.error("Could not switch due to blocking");
619 boolean removed = dsQueue.remove(source);
620 if(!broken.contains(source))
622 if(broken.add(source))
624 LOGGER.warn("DB Recovery: DataSource <" + source.getDbConnectionName() + "> put in the recovery mode. Reason : " + exc.getMessage());
626 LOGGER.warn("Error putting DataSource <" +source.getDbConnectionName()+ "> in recovery mode.");
629 LOGGER.info("DB Recovery: DataSource <" + source.getDbConnectionName() + "> already in recovery queue");
633 if(!dsQueue.isEmpty())
635 LOGGER.warn("DB DataSource <" + dsQueue.peek().getDbConnectionName() + "> became active");
638 } catch (Exception e) {
643 public void cleanUp() {
644 for(Iterator<CachedDataSource> it=dsQueue.iterator();it.hasNext();){
645 CachedDataSource cds = (CachedDataSource)it.next();
651 this.terminating = true;
655 broken.add( new TerminatingCachedDataSource(null));
656 } catch(Exception exc){
657 LOGGER.error("Waiting for Worker to stop", exc);
660 worker.join(terminationTimeOut);
661 LOGGER.info("DBResourceManager.RecoveryMgr <"+worker.toString() +"> termination was successful: " + worker.getState());
662 } catch(Exception exc){
663 LOGGER.error("Waiting for Worker thread to terminate ", exc);
667 public static DBResourceManager create(Properties props) throws Exception {
668 DBResourceManager dbmanager = new DBResourceManager(props);
669 dbmanager.config(props);
673 public PrintWriter getLogWriter() throws SQLException {
674 return ((CachedDataSource)this.dsQueue.peek()).getLogWriter();
677 public int getLoginTimeout() throws SQLException {
678 return ((CachedDataSource)this.dsQueue.peek()).getLoginTimeout();
681 public void setLogWriter(PrintWriter out) throws SQLException {
682 ((CachedDataSource)this.dsQueue.peek()).setLogWriter(out);
685 public void setLoginTimeout(int seconds) throws SQLException {
686 ((CachedDataSource)this.dsQueue.peek()).setLoginTimeout(seconds);
689 public void displayState(){
690 if(LOGGER.isDebugEnabled()){
691 LOGGER.debug("POOLS : Active = "+dsQueue.size() + ";\t Broken = "+broken.size());
692 CachedDataSource current = (CachedDataSource)dsQueue.peek();
693 if(current != null) {
694 LOGGER.debug("POOL : Active name = \'"+current.getDbConnectionName()+ "\'");
700 * @see org.onap.ccsdk.sli.resource.dblib.DbLibService#isActive()
703 public boolean isActive() {
704 return this.dsQueue.size()>0;
707 public String getActiveStatus(){
708 return "Connected: " + dsQueue.size()+"\tIn-recovery: "+broken.size();
711 public String getDBStatus(boolean htmlFormat) {
712 StringBuilder buffer = new StringBuilder();
714 ArrayList<CachedDataSource> list = new ArrayList<CachedDataSource>();
715 list.addAll(dsQueue);
719 buffer.append("<tr class=\"headerRow\"><th id=\"header1\">")
720 .append("Name:").append("</th>");
721 for (int i = 0; i < list.size(); i++) {
722 buffer.append("<th id=\"header").append(2 + i).append("\">");
723 buffer.append(((CachedDataSource) list.get(i)).getDbConnectionName()).append("</th>");
725 buffer.append("</tr>");
727 buffer.append("<tr><td>State:</td>");
728 for (int i = 0; i < list.size(); i++) {
729 if (broken.contains(list.get(i))) {
730 buffer.append("<td>in recovery</td>");
732 if (dsQueue.contains(list.get(i))) {
733 if (dsQueue.peek() == list.get(i))
734 buffer.append("<td>active</td>");
736 buffer.append("<td>standby</td>");
739 buffer.append("</tr>");
742 for (int i = 0; i < list.size(); i++) {
743 buffer.append("Name: ").append(((CachedDataSource) list.get(i)).getDbConnectionName());
744 buffer.append("\tState: ");
745 if (broken.contains(list.get(i))) {
746 buffer.append("in recovery");
748 if (dsQueue.contains(list.get(i))) {
749 if (dsQueue.peek() == list.get(i))
750 buffer.append("active");
752 buffer.append("standby");
759 return buffer.toString();
762 public boolean isWrapperFor(Class<?> iface) throws SQLException {
766 public <T> T unwrap(Class<T> iface) throws SQLException {
771 * @return the monitorDbResponse
773 public final boolean isMonitorDbResponse() {
774 return recoveryMode && monitorDbResponse;
778 CachedDataSource obj = dsQueue.peek();
779 Exception ption = new Exception();
781 for(int i=0; i<5; i++)
783 handleGetConnectionException(obj, ption);
785 } catch(Throwable exc){
786 LOGGER.warn("", exc);
790 public String getPreferredDSName(){
792 return getPreferredDataSourceName(dsSelector);
797 public String getPreferredDataSourceName(AtomicBoolean flipper) {
799 LinkedList<CachedDataSource> snapshot = new LinkedList<CachedDataSource>(dsQueue);
800 if(snapshot.size() > 1){
801 CachedDataSource first = snapshot.getFirst();
802 CachedDataSource last = snapshot.getLast();
804 int delta = first.getMonitor().getPorcessedConnectionsCount() - last.getMonitor().getPorcessedConnectionsCount();
807 } else if(delta > 0) {
810 // check the last value and return !last
811 flipper.getAndSet(!flipper.get());
815 Collections.reverse(snapshot);
817 return snapshot.peek().getDbConnectionName();
820 public java.util.logging.Logger getParentLogger()
821 throws SQLFeatureNotSupportedException {
825 public String getMasterName() {
827 return getMasterDataSourceName(dsSelector);
833 private String getMasterDataSourceName(AtomicBoolean flipper) {
835 LinkedList<CachedDataSource> snapshot = new LinkedList<CachedDataSource>(dsQueue);
836 if(snapshot.size() > 1){
837 CachedDataSource first = snapshot.getFirst();
838 CachedDataSource last = snapshot.getLast();
840 int delta = first.getMonitor().getPorcessedConnectionsCount() - last.getMonitor().getPorcessedConnectionsCount();
843 } else if(delta > 0) {
846 // check the last value and return !last
847 flipper.getAndSet(!flipper.get());
851 Collections.reverse(snapshot);
853 return snapshot.peek().getDbConnectionName();
856 class RemindTask extends TimerTask {
858 CachedDataSource ds = dsQueue.peek();
860 ds.getPoolInfo(false);