X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=dblib%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopenecomp%2Fsdnc%2Fsli%2Fresource%2Fdblib%2FDBResourceManager.java;fp=dblib%2Fprovider%2Fsrc%2Fmain%2Fjava%2Forg%2Fopenecomp%2Fsdnc%2Fsli%2Fresource%2Fdblib%2FDBResourceManager.java;h=5cf2953b7c6b75fe1c1cbf46f5c486e256ada942;hb=2a93b9ddf308b6bdd135be182c1b3fa779891840;hp=0000000000000000000000000000000000000000;hpb=f5882cb4cd6338dcd9780c770056f4047c9ff174;p=ccsdk%2Fsli%2Fcore.git diff --git a/dblib/provider/src/main/java/org/openecomp/sdnc/sli/resource/dblib/DBResourceManager.java b/dblib/provider/src/main/java/org/openecomp/sdnc/sli/resource/dblib/DBResourceManager.java new file mode 100644 index 00000000..5cf2953b --- /dev/null +++ b/dblib/provider/src/main/java/org/openecomp/sdnc/sli/resource/dblib/DBResourceManager.java @@ -0,0 +1,862 @@ +/*- + * ============LICENSE_START======================================================= + * openecomp + * ================================================================================ + * Copyright (C) 2016 - 2017 AT&T + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.sdnc.sli.resource.dblib; + +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.SQLDataException; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLIntegrityConstraintViolationException; +import java.sql.SQLSyntaxErrorException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Observable; +import java.util.PriorityQueue; +import java.util.Properties; +import java.util.Queue; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.sql.DataSource; +import javax.sql.rowset.CachedRowSet; + +import org.apache.tomcat.jdbc.pool.PoolExhaustedException; +import org.openecomp.sdnc.sli.resource.dblib.config.DbConfigPool; +import org.openecomp.sdnc.sli.resource.dblib.factory.AbstractDBResourceManagerFactory; +import org.openecomp.sdnc.sli.resource.dblib.factory.AbstractResourceManagerFactory; +import org.openecomp.sdnc.sli.resource.dblib.factory.DBConfigFactory; +import org.openecomp.sdnc.sli.resource.dblib.pm.PollingWorker; +import org.openecomp.sdnc.sli.resource.dblib.pm.SQLExecutionMonitor; +import com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * @version $Revision: 1.15 $ + * Change Log + * Author Date Comments + * ============== ======== ==================================================== + * Rich Tabedzki + */ +public class DBResourceManager implements DataSource, DataAccessor, DBResourceObserver, DbLibService { + private static Logger LOGGER = LoggerFactory.getLogger(DBResourceManager.class); + + transient boolean terminating = false; + transient protected long retryInterval = 10000L; + transient boolean recoveryMode = true; + + protected final AtomicBoolean dsSelector = new AtomicBoolean(); + +// Queue dsQueue = new ConcurrentLinkedQueue(); + Queue dsQueue = new PriorityQueue(4, new Comparator(){ + + @Override + public int compare(CachedDataSource left, CachedDataSource right) { + try { + if(!left.isSlave()) + return -1; + if(!right.isSlave()) + return 1; + + } catch (Throwable e) { + LOGGER.warn("", e); + } + return 0; + } + + }); + protected final Set broken = Collections.synchronizedSet(new HashSet()); + protected final Object monitor = new Object(); + protected final Properties configProps; + protected final Thread worker; + + protected final long terminationTimeOut; + protected final boolean monitorDbResponse; + protected final long monitoringInterval; + protected final long monitoringInitialDelay; + protected final long expectedCompletionTime; + protected final long unprocessedFailoverThreshold; + + public DBResourceManager(Properties props){ + this.configProps = props; + + // get retry interval value + retryInterval = getLongFromProperties(props, "org.openecomp.dblib.connection.retry", 10000L); + + // get recovery mode flag + recoveryMode = getBooleanFromProperties(props, "org.openecomp.dblib.connection.recovery", true); + if(!recoveryMode) + { + recoveryMode = false; + LOGGER.info("Recovery Mode disabled"); + } + // get time out value for thread cleanup + terminationTimeOut = getLongFromProperties(props, "org.openecomp.dblib.termination.timeout", 300000L); + // get properties for monitoring + monitorDbResponse = getBooleanFromProperties(props, "org.openecomp.dblib.connection.monitor", false); + monitoringInterval = getLongFromProperties(props, "org.openecomp.dblib.connection.monitor.interval", 1000L); + monitoringInitialDelay = getLongFromProperties(props, "org.openecomp.dblib.connection.monitor.startdelay", 5000L); + expectedCompletionTime = getLongFromProperties(props, "org.openecomp.dblib.connection.monitor.expectedcompletiontime", 5000L); + unprocessedFailoverThreshold = getLongFromProperties(props, "org.openecomp.dblib.connection.monitor.unprocessedfailoverthreshold", 3L); + + // initialize performance monitor + PollingWorker.createInistance(props); + + // initialize recovery thread + worker = new RecoveryMgr(); + worker.setName("DBResourcemanagerWatchThread"); + worker.setDaemon(true); + worker.start(); + } + + private void config(Properties ctx) throws Exception { + + DbConfigPool dbConfig = DBConfigFactory.createConfig(this.configProps); + + try { + AbstractResourceManagerFactory factory = AbstractDBResourceManagerFactory.getFactory(dbConfig.getType()); + if(LOGGER.isInfoEnabled()){ + LOGGER.info("Default DB config is : " + dbConfig.getType()); + LOGGER.info("Using factory : " + factory.getClass().getName()); + } + CachedDataSource[] cachedDS = factory.initDBResourceManager(dbConfig, this); + if(cachedDS == null || cachedDS.length == 0) { + LOGGER.error("Initialization of CachedDataSources failed. No instance was created."); + throw new Exception("Failed to initialize DB Library. No data source was created."); + } + + for(int i=0; i 1){ + handleGetConnectionException(dataSource, new Exception(data.toString())); + } + } + } + } + } + + public void testForceRecovery() + { + CachedDataSource active = (CachedDataSource) this.dsQueue.peek(); + handleGetConnectionException(active, new Exception("test")); + } + + class RecoveryMgr extends Thread { + + public void run() { + while(!terminating) + { + try { + Thread.sleep(retryInterval); + } catch (InterruptedException e1) { } + CachedDataSource brokenSource = null; + try { + if (!broken.isEmpty()) { + CachedDataSource[] sourceArray = broken.toArray(new CachedDataSource[0]); + for (int i = 0; i < sourceArray.length; i++) + { + brokenSource = sourceArray[i]; + if (brokenSource instanceof TerminatingCachedDataSource) + break; + if (resetConnectionPool(brokenSource)) { + broken.remove(brokenSource); + brokenSource.blockImmediateOffLine(); + dsQueue.add(brokenSource); + LOGGER.info("DataSource <" + + brokenSource.getDbConnectionName() + + "> recovered."); + } + brokenSource = null; + } + } + } catch (Exception exc) { + LOGGER.warn(exc.getMessage()); + if(brokenSource != null){ + try { + if(!broken.contains(brokenSource)) + broken.add(brokenSource); + brokenSource = null; + } catch (Exception e1) { } + } + } + } + LOGGER.info("DBResourceManager.RecoveryMgr <"+this.toString() +"> terminated." ); + } + + private boolean resetConnectionPool(CachedDataSource dataSource){ + try { + return dataSource.testConnection(); + } catch (Exception exc) { + LOGGER.info("DataSource <" + dataSource.getDbConnectionName() + "> resetCache failed with error: "+ exc.getMessage()); + return false; + } + } + } + + /* (non-Javadoc) + * @see org.openecomp.sdnc.sli.resource.dblib.DbLibService#getData(java.lang.String, java.util.ArrayList, java.lang.String) + */ + @Override + public CachedRowSet getData(String statement, ArrayList arguments, String preferredDS) throws SQLException { + ArrayList newList=new ArrayList(); + if(arguments != null && !arguments.isEmpty()) { + newList.addAll(arguments); + } + if(recoveryMode) + return requestDataWithRecovery(statement, newList, preferredDS); + else + return requestDataNoRecovery(statement, newList, preferredDS); + } + + private CachedRowSet requestDataWithRecovery(String statement, ArrayList arguments, String preferredDS) throws SQLException { + Throwable lastException = null; + CachedDataSource active = null; + + // test if there are any connection pools available + LinkedList sources = new LinkedList(this.dsQueue); + if(sources.isEmpty()){ + LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available."); + throw new DBLibException("No active DB connection pools are available in RequestDataWithRecovery call."); + } + if(preferredDS != null && !sources.peek().getDbConnectionName().equals(preferredDS)) { + Collections.reverse(sources); + } + + + // loop through available data sources to retrieve data. + while(!sources.isEmpty()) + { + active = sources.peek(); + + long time = System.currentTimeMillis(); + try { + if(!active.isFabric()) { + CachedDataSource master = findMaster(); + if(master != null) { + active = master; + master = null; + } + } + sources.remove(active); + return active.getData(statement, arguments); + } catch(SQLDataException exc){ + throw exc; + } catch(SQLSyntaxErrorException exc){ + throw exc; + } catch(SQLIntegrityConstraintViolationException exc){ + throw exc; + } catch(Throwable exc){ + lastException = exc; + String message = exc.getMessage(); + if(message == null) { + if(exc.getCause() != null) { + message = exc.getCause().getMessage(); + } + if(message == null) + message = exc.getClass().getName(); + } + LOGGER.error("Generated alarm: "+active.getDbConnectionName()+" - "+message); + handleGetConnectionException(active, exc); + } finally { + if(LOGGER.isDebugEnabled()){ + time = (System.currentTimeMillis() - time); + LOGGER.debug("getData processing time : "+ active.getDbConnectionName()+" "+time+" miliseconds."); + } + } + } + if(lastException instanceof SQLException){ + throw (SQLException)lastException; + } + // repackage the exception + // you are here because either you run out of available data sources + // or the last exception was not of SQLException type. + // repackage the exception + if(lastException == null) { + throw new DBLibException("The operation timed out while waiting to acquire a new connection." ); + } else { + SQLException exception = new DBLibException(lastException.getMessage()); + exception.setStackTrace(lastException.getStackTrace()); + if(lastException.getCause() instanceof SQLException) { + throw (SQLException)lastException.getCause(); + } + throw exception; + } + } + + private CachedRowSet requestDataNoRecovery(String statement, ArrayList arguments, String preferredDS) throws SQLException { + if(dsQueue.isEmpty()){ + LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available."); + throw new DBLibException("No active DB connection pools are available in RequestDataNoRecovery call."); + } + CachedDataSource active = (CachedDataSource) this.dsQueue.peek(); + long time = System.currentTimeMillis(); + try { + if(!active.isFabric()) { + CachedDataSource master = findMaster(); + if(master != null) + active = master; + } + return active.getData(statement, arguments); +// } catch(SQLDataException exc){ +// throw exc; + } catch(Throwable exc){ + String message = exc.getMessage(); + if(message == null) + message = exc.getClass().getName(); + LOGGER.error("Generated alarm: "+active.getDbConnectionName()+" - "+message); + if(exc instanceof SQLException) + throw (SQLException)exc; + else { + DBLibException excptn = new DBLibException(exc.getMessage()); + excptn.setStackTrace(exc.getStackTrace()); + throw excptn; + } + } finally { + if(LOGGER.isDebugEnabled()){ + time = (System.currentTimeMillis() - time); + LOGGER.debug(">> getData : "+ active.getDbConnectionName()+" "+time+" miliseconds."); + } + } + } + + + /* (non-Javadoc) + * @see org.openecomp.sdnc.sli.resource.dblib.DbLibService#writeData(java.lang.String, java.util.ArrayList, java.lang.String) + */ + @Override + public boolean writeData(String statement, ArrayList arguments, String preferredDS) throws SQLException + { + ArrayList newList=new ArrayList(); + if(arguments != null && !arguments.isEmpty()) { + newList.addAll(arguments); + } + + return writeDataNoRecovery(statement, newList, preferredDS); + } + + CachedDataSource findMaster() throws PoolExhaustedException, MySQLNonTransientConnectionException { + CachedDataSource master = null; + CachedDataSource[] dss = this.dsQueue.toArray(new CachedDataSource[0]); + for(int i=0; i arguments, String preferredDS) throws SQLException { + if(dsQueue.isEmpty()){ + LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available."); + throw new DBLibException("No active DB connection pools are available in RequestDataNoRecovery call."); + } + + boolean initialRequest = true; + boolean retryAllowed = true; + CachedDataSource active = (CachedDataSource) this.dsQueue.peek(); + long time = System.currentTimeMillis(); + while(initialRequest) { + initialRequest = false; + try { + if(!active.isFabric()) { + CachedDataSource master = findMaster(); + if(master != null) { + active = master; + } + } + + return active.writeData(statement, arguments); + } catch(Throwable exc){ + String message = exc.getMessage(); + if(message == null) + message = exc.getClass().getName(); + LOGGER.error("Generated alarm: "+active.getDbConnectionName()+" - "+message); + if(exc instanceof SQLException) { + SQLException sqlExc = SQLException.class.cast(exc); + // handle read-only exception + if(sqlExc.getErrorCode() == 1290 && "HY000".equals(sqlExc.getSQLState())) { + LOGGER.warn("retrying due to: " + sqlExc.getMessage()); + dsQueue.remove(active); + dsQueue.add(active); + if(retryAllowed){ + retryAllowed = false; + initialRequest = true; + continue; + } + } + throw (SQLException)exc; + } else { + DBLibException excptn = new DBLibException(exc.getMessage()); + excptn.setStackTrace(exc.getStackTrace()); + throw excptn; + } + } finally { + if(LOGGER.isDebugEnabled()){ + time = (System.currentTimeMillis() - time); + LOGGER.debug("writeData processing time : "+ active.getDbConnectionName()+" "+time+" miliseconds."); + } + } + } + return true; + } + + private void setDataSource(CachedDataSource dataSource) { + if(dataSource.testConnection(true)){ + this.dsQueue.add(dataSource); + } else { + this.broken.add(dataSource); + } + } + + public Connection getConnection() throws SQLException { + Throwable lastException = null; + CachedDataSource active = null; + + if(dsQueue.isEmpty()){ + throw new DBLibException("No active DB connection pools are available in GetConnection call."); + } + + try { + active = dsQueue.peek(); + CachedDataSource tmpActive = findMaster(); + if(tmpActive != null) { + active = tmpActive; + } + return new DBLibConnection(active.getConnection(), active); + } catch(javax.sql.rowset.spi.SyncFactoryException exc){ + LOGGER.debug("Free memory (bytes): " + Runtime.getRuntime().freeMemory()); + LOGGER.warn("CLASSPATH issue. Allowing retry", exc); + lastException = exc; + } catch(PoolExhaustedException exc) { + throw new NoAvailableConnectionsException(exc); + } catch(MySQLNonTransientConnectionException exc){ + throw new NoAvailableConnectionsException(exc); + } catch(Exception exc){ + lastException = exc; + if(recoveryMode){ + handleGetConnectionException(active, exc); + } else { + if(exc instanceof MySQLNonTransientConnectionException) { + throw new NoAvailableConnectionsException(exc); + } if(exc instanceof SQLException) { + throw (SQLException)exc; + } else { + DBLibException excptn = new DBLibException(exc.getMessage()); + excptn.setStackTrace(exc.getStackTrace()); + throw excptn; + } + } + } catch (Throwable trwb) { + DBLibException excptn = new DBLibException(trwb.getMessage()); + excptn.setStackTrace(trwb.getStackTrace()); + throw excptn; + } finally { + if(LOGGER.isDebugEnabled()){ + displayState(); + } + } + + if(lastException instanceof SQLException){ + throw (SQLException)lastException; + } + // repackage the exception + if(lastException == null) { + throw new DBLibException("The operation timed out while waiting to acquire a new connection." ); + } else { + SQLException exception = new DBLibException(lastException.getMessage()); + exception.setStackTrace(lastException.getStackTrace()); + if(lastException.getCause() instanceof SQLException) { +// exception.setNextException((SQLException)lastException.getCause()); + throw (SQLException)lastException.getCause(); + } + throw exception; + } + } + + public Connection getConnection(String username, String password) + throws SQLException { + CachedDataSource active = null; + + if(dsQueue.isEmpty()){ + throw new DBLibException("No active DB connection pools are available in GetConnection call."); + } + + + try { + active = dsQueue.peek(); + CachedDataSource tmpActive = findMaster(); + if(tmpActive != null) { + active = tmpActive; + } + return active.getConnection(username, password); + } catch(Throwable exc){ + if(recoveryMode){ + handleGetConnectionException(active, exc); + } else { + if(exc instanceof SQLException) + throw (SQLException)exc; + else { + DBLibException excptn = new DBLibException(exc.getMessage()); + excptn.setStackTrace(exc.getStackTrace()); + throw excptn; + } + } + + } + + throw new DBLibException("No connections available in DBResourceManager in GetConnection call."); + } + + private void handleGetConnectionException(CachedDataSource source, Throwable exc) { + try { + if(!source.canTakeOffLine()) + { + LOGGER.error("Could not switch due to blocking"); + return; + } + + boolean removed = dsQueue.remove(source); + if(!broken.contains(source)) + { + if(broken.add(source)) + { + LOGGER.warn("DB Recovery: DataSource <" + source.getDbConnectionName() + "> put in the recovery mode. Reason : " + exc.getMessage()); + } else { + LOGGER.warn("Error putting DataSource <" +source.getDbConnectionName()+ "> in recovery mode."); + } + } else { + LOGGER.info("DB Recovery: DataSource <" + source.getDbConnectionName() + "> already in recovery queue"); + } + if(removed) + { + if(!dsQueue.isEmpty()) + { + LOGGER.warn("DB DataSource <" + dsQueue.peek().getDbConnectionName() + "> became active"); + } + } + } catch (Exception e) { + LOGGER.error("", e); + } + } + + public void cleanUp() { + for(Iterator it=dsQueue.iterator();it.hasNext();){ + CachedDataSource cds = (CachedDataSource)it.next(); + it.remove(); + cds.cleanUp(); + } + + try { + this.terminating = true; + if(broken != null) + { + try { + broken.add( new TerminatingCachedDataSource(null)); + } catch(Exception exc){ + LOGGER.error("Waiting for Worker to stop", exc); + } + } + worker.join(terminationTimeOut); + LOGGER.info("DBResourceManager.RecoveryMgr <"+worker.toString() +"> termination was successful: " + worker.getState()); + } catch(Exception exc){ + LOGGER.error("Waiting for Worker thread to terminate ", exc); + } + } + + public static DBResourceManager create(Properties props) throws Exception { + DBResourceManager dbmanager = new DBResourceManager(props); + dbmanager.config(props); + return dbmanager; + } + + public PrintWriter getLogWriter() throws SQLException { + return ((CachedDataSource)this.dsQueue.peek()).getLogWriter(); + } + + public int getLoginTimeout() throws SQLException { + return ((CachedDataSource)this.dsQueue.peek()).getLoginTimeout(); + } + + public void setLogWriter(PrintWriter out) throws SQLException { + ((CachedDataSource)this.dsQueue.peek()).setLogWriter(out); + } + + public void setLoginTimeout(int seconds) throws SQLException { + ((CachedDataSource)this.dsQueue.peek()).setLoginTimeout(seconds); + } + + public void displayState(){ + if(LOGGER.isDebugEnabled()){ + LOGGER.debug("POOLS : Active = "+dsQueue.size() + ";\t Broken = "+broken.size()); + CachedDataSource current = (CachedDataSource)dsQueue.peek(); + if(current != null) { + LOGGER.debug("POOL : Active name = \'"+current.getDbConnectionName()+ "\'"); + } + } + } + + /* (non-Javadoc) + * @see org.openecomp.sdnc.sli.resource.dblib.DbLibService#isActive() + */ + @Override + public boolean isActive() { + return this.dsQueue.size()>0; + } + + public String getActiveStatus(){ + return "Connected: " + dsQueue.size()+"\tIn-recovery: "+broken.size(); + } + + public String getDBStatus(boolean htmlFormat) { + StringBuilder buffer = new StringBuilder(); + + ArrayList list = new ArrayList(); + list.addAll(dsQueue); + list.addAll(broken); + if (htmlFormat) + { + buffer.append("") + .append("Name:").append(""); + for (int i = 0; i < list.size(); i++) { + buffer.append(""); + buffer.append(((CachedDataSource) list.get(i)).getDbConnectionName()).append(""); + } + buffer.append(""); + + buffer.append("State:"); + for (int i = 0; i < list.size(); i++) { + if (broken.contains(list.get(i))) { + buffer.append("in recovery"); + } + if (dsQueue.contains(list.get(i))) { + if (dsQueue.peek() == list.get(i)) + buffer.append("active"); + else + buffer.append("standby"); + } + } + buffer.append(""); + + } else { + for (int i = 0; i < list.size(); i++) { + buffer.append("Name: ").append(((CachedDataSource) list.get(i)).getDbConnectionName()); + buffer.append("\tState: "); + if (broken.contains(list.get(i))) { + buffer.append("in recovery"); + } else + if (dsQueue.contains(list.get(i))) { + if (dsQueue.peek() == list.get(i)) + buffer.append("active"); + else + buffer.append("standby"); + } + + buffer.append("\n"); + + } + } + return buffer.toString(); + } + + public boolean isWrapperFor(Class iface) throws SQLException { + return false; + } + + public T unwrap(Class iface) throws SQLException { + return null; + } + + /** + * @return the monitorDbResponse + */ + public final boolean isMonitorDbResponse() { + return recoveryMode && monitorDbResponse; + } + + public void test(){ + CachedDataSource obj = dsQueue.peek(); + Exception ption = new Exception(); + try { + for(int i=0; i<5; i++) + { + handleGetConnectionException(obj, ption); + } + } catch(Throwable exc){ + LOGGER.warn("", exc); + } + } + + public String getPreferredDSName(){ + if(isActive()){ + return getPreferredDataSourceName(dsSelector); + } + return ""; + } + + public String getPreferredDataSourceName(AtomicBoolean flipper) { + + LinkedList snapshot = new LinkedList(dsQueue); + if(snapshot.size() > 1){ + CachedDataSource first = snapshot.getFirst(); + CachedDataSource last = snapshot.getLast(); + + int delta = first.getMonitor().getPorcessedConnectionsCount() - last.getMonitor().getPorcessedConnectionsCount(); + if(delta < 0) { + flipper.set(false); + } else if(delta > 0) { + flipper.set(true); + } else { + // check the last value and return !last + flipper.getAndSet(!flipper.get()); + } + + if (flipper.get()) + Collections.reverse(snapshot); + } + return snapshot.peek().getDbConnectionName(); + } + + public java.util.logging.Logger getParentLogger() + throws SQLFeatureNotSupportedException { + return null; + } + + public String getMasterName() { + if(isActive()){ + return getMasterDataSourceName(dsSelector); + } + return ""; + } + + + private String getMasterDataSourceName(AtomicBoolean flipper) { + + LinkedList snapshot = new LinkedList(dsQueue); + if(snapshot.size() > 1){ + CachedDataSource first = snapshot.getFirst(); + CachedDataSource last = snapshot.getLast(); + + int delta = first.getMonitor().getPorcessedConnectionsCount() - last.getMonitor().getPorcessedConnectionsCount(); + if(delta < 0) { + flipper.set(false); + } else if(delta > 0) { + flipper.set(true); + } else { + // check the last value and return !last + flipper.getAndSet(!flipper.get()); + } + + if (flipper.get()) + Collections.reverse(snapshot); + } + return snapshot.peek().getDbConnectionName(); + } + + class RemindTask extends TimerTask { + public void run() { + CachedDataSource ds = dsQueue.peek(); + if(ds != null) + ds.getPoolInfo(false); + } + } +}