2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2016 - 2017 AT&T
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdnc.sli.resource.dblib;
23 import java.io.PrintWriter;
24 import java.sql.Connection;
25 import java.sql.SQLDataException;
26 import java.sql.SQLException;
27 import java.sql.SQLFeatureNotSupportedException;
28 import java.sql.SQLIntegrityConstraintViolationException;
29 import java.sql.SQLSyntaxErrorException;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.Comparator;
33 import java.util.HashSet;
34 import java.util.Iterator;
35 import java.util.LinkedList;
36 import java.util.Observable;
37 import java.util.PriorityQueue;
38 import java.util.Properties;
39 import java.util.Queue;
41 import java.util.Timer;
42 import java.util.TimerTask;
43 import java.util.concurrent.ConcurrentLinkedQueue;
44 import java.util.concurrent.atomic.AtomicBoolean;
46 import javax.sql.DataSource;
47 import javax.sql.rowset.CachedRowSet;
49 import org.apache.tomcat.jdbc.pool.PoolExhaustedException;
50 import org.openecomp.sdnc.sli.resource.dblib.config.DbConfigPool;
51 import org.openecomp.sdnc.sli.resource.dblib.factory.AbstractDBResourceManagerFactory;
52 import org.openecomp.sdnc.sli.resource.dblib.factory.AbstractResourceManagerFactory;
53 import org.openecomp.sdnc.sli.resource.dblib.factory.DBConfigFactory;
54 import org.openecomp.sdnc.sli.resource.dblib.pm.PollingWorker;
55 import org.openecomp.sdnc.sli.resource.dblib.pm.SQLExecutionMonitor;
56 import com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
62 * @version $Revision: 1.15 $
64 * Author Date Comments
65 * ============== ======== ====================================================
68 public class DBResourceManager implements DataSource, DataAccessor, DBResourceObserver, DbLibService {
69 private static Logger LOGGER = LoggerFactory.getLogger(DBResourceManager.class);
71 transient boolean terminating = false;
72 transient protected long retryInterval = 10000L;
73 transient boolean recoveryMode = true;
75 protected final AtomicBoolean dsSelector = new AtomicBoolean();
77 // Queue<CachedDataSource> dsQueue = new ConcurrentLinkedQueue<CachedDataSource>();
78 Queue<CachedDataSource> dsQueue = new PriorityQueue<CachedDataSource>(4, new Comparator<CachedDataSource>(){
81 public int compare(CachedDataSource left, CachedDataSource right) {
88 } catch (Throwable e) {
95 protected final Set<CachedDataSource> broken = Collections.synchronizedSet(new HashSet<CachedDataSource>());
96 protected final Object monitor = new Object();
97 protected final Properties configProps;
98 protected final Thread worker;
100 protected final long terminationTimeOut;
101 protected final boolean monitorDbResponse;
102 protected final long monitoringInterval;
103 protected final long monitoringInitialDelay;
104 protected final long expectedCompletionTime;
105 protected final long unprocessedFailoverThreshold;
107 public DBResourceManager(Properties props){
108 this.configProps = props;
110 // get retry interval value
111 retryInterval = getLongFromProperties(props, "org.openecomp.dblib.connection.retry", 10000L);
113 // get recovery mode flag
114 recoveryMode = getBooleanFromProperties(props, "org.openecomp.dblib.connection.recovery", true);
117 recoveryMode = false;
118 LOGGER.info("Recovery Mode disabled");
120 // get time out value for thread cleanup
121 terminationTimeOut = getLongFromProperties(props, "org.openecomp.dblib.termination.timeout", 300000L);
122 // get properties for monitoring
123 monitorDbResponse = getBooleanFromProperties(props, "org.openecomp.dblib.connection.monitor", false);
124 monitoringInterval = getLongFromProperties(props, "org.openecomp.dblib.connection.monitor.interval", 1000L);
125 monitoringInitialDelay = getLongFromProperties(props, "org.openecomp.dblib.connection.monitor.startdelay", 5000L);
126 expectedCompletionTime = getLongFromProperties(props, "org.openecomp.dblib.connection.monitor.expectedcompletiontime", 5000L);
127 unprocessedFailoverThreshold = getLongFromProperties(props, "org.openecomp.dblib.connection.monitor.unprocessedfailoverthreshold", 3L);
129 // initialize performance monitor
130 PollingWorker.createInistance(props);
132 // initialize recovery thread
133 worker = new RecoveryMgr();
134 worker.setName("DBResourcemanagerWatchThread");
135 worker.setDaemon(true);
139 private void config(Properties ctx) throws Exception {
141 DbConfigPool dbConfig = DBConfigFactory.createConfig(this.configProps);
144 AbstractResourceManagerFactory factory = AbstractDBResourceManagerFactory.getFactory(dbConfig.getType());
145 if(LOGGER.isInfoEnabled()){
146 LOGGER.info("Default DB config is : " + dbConfig.getType());
147 LOGGER.info("Using factory : " + factory.getClass().getName());
149 CachedDataSource[] cachedDS = factory.initDBResourceManager(dbConfig, this);
150 if(cachedDS == null || cachedDS.length == 0) {
151 LOGGER.error("Initialization of CachedDataSources failed. No instance was created.");
152 throw new Exception("Failed to initialize DB Library. No data source was created.");
155 for(int i=0; i<cachedDS.length; i++){
156 if(cachedDS[i] != null && cachedDS[i].isInitialized()){
157 setDataSource(cachedDS[i]);
158 cachedDS[i].setInterval(monitoringInterval);
159 cachedDS[i].setInitialDelay(monitoringInitialDelay);
160 cachedDS[i].setExpectedCompletionTime(expectedCompletionTime);
161 cachedDS[i].setUnprocessedFailoverThreshold(unprocessedFailoverThreshold);
162 cachedDS[i].addObserver(this);
166 } catch(Exception exc){
171 private long getLongFromProperties(Properties props, String property, long defaultValue)
174 long tmpLongValue = defaultValue;
176 value = (String)props.getProperty(property);
178 tmpLongValue = Long.parseLong(value);
180 } catch(NumberFormatException exc) {
181 if(LOGGER.isWarnEnabled()){
182 LOGGER.warn("'"+property+"'=" + value+" is invalid. It should be a numeric value");
184 } catch(Exception exc) {
190 private boolean getBooleanFromProperties(Properties props, String property, boolean defaultValue)
192 boolean tmpValue = defaultValue;
196 value = (String)props.getProperty(property);
198 tmpValue = Boolean.parseBoolean(value);
200 } catch(NumberFormatException exc) {
201 if(LOGGER.isWarnEnabled()){
202 LOGGER.warn("'"+property+"'=" + value+" is invalid. It should be a boolean value");
204 } catch(Exception exc) {
211 public void update(Observable observable, Object data) {
212 // if observable is active and there is a standby available, switch
213 if(observable instanceof SQLExecutionMonitor)
215 SQLExecutionMonitor monitor = (SQLExecutionMonitor)observable;
216 if(monitor.getParent() instanceof CachedDataSource)
218 CachedDataSource dataSource = (CachedDataSource)monitor.getParent();
219 if(dataSource == dsQueue.peek())
221 if(recoveryMode && dsQueue.size() > 1){
222 handleGetConnectionException(dataSource, new Exception(data.toString()));
229 public void testForceRecovery()
231 CachedDataSource active = (CachedDataSource) this.dsQueue.peek();
232 handleGetConnectionException(active, new Exception("test"));
235 class RecoveryMgr extends Thread {
241 Thread.sleep(retryInterval);
242 } catch (InterruptedException e1) { }
243 CachedDataSource brokenSource = null;
245 if (!broken.isEmpty()) {
246 CachedDataSource[] sourceArray = broken.toArray(new CachedDataSource[0]);
247 for (int i = 0; i < sourceArray.length; i++)
249 brokenSource = sourceArray[i];
250 if (brokenSource instanceof TerminatingCachedDataSource)
252 if (resetConnectionPool(brokenSource)) {
253 broken.remove(brokenSource);
254 brokenSource.blockImmediateOffLine();
255 dsQueue.add(brokenSource);
256 LOGGER.info("DataSource <"
257 + brokenSource.getDbConnectionName()
263 } catch (Exception exc) {
264 LOGGER.warn(exc.getMessage());
265 if(brokenSource != null){
267 if(!broken.contains(brokenSource))
268 broken.add(brokenSource);
270 } catch (Exception e1) { }
274 LOGGER.info("DBResourceManager.RecoveryMgr <"+this.toString() +"> terminated." );
277 private boolean resetConnectionPool(CachedDataSource dataSource){
279 return dataSource.testConnection();
280 } catch (Exception exc) {
281 LOGGER.info("DataSource <" + dataSource.getDbConnectionName() + "> resetCache failed with error: "+ exc.getMessage());
288 * @see org.openecomp.sdnc.sli.resource.dblib.DbLibService#getData(java.lang.String, java.util.ArrayList, java.lang.String)
291 public CachedRowSet getData(String statement, ArrayList<String> arguments, String preferredDS) throws SQLException {
292 ArrayList<Object> newList=new ArrayList<Object>();
293 if(arguments != null && !arguments.isEmpty()) {
294 newList.addAll(arguments);
297 return requestDataWithRecovery(statement, newList, preferredDS);
299 return requestDataNoRecovery(statement, newList, preferredDS);
302 private CachedRowSet requestDataWithRecovery(String statement, ArrayList<Object> arguments, String preferredDS) throws SQLException {
303 Throwable lastException = null;
304 CachedDataSource active = null;
306 // test if there are any connection pools available
307 LinkedList<CachedDataSource> sources = new LinkedList<CachedDataSource>(this.dsQueue);
308 if(sources.isEmpty()){
309 LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available.");
310 throw new DBLibException("No active DB connection pools are available in RequestDataWithRecovery call.");
312 if(preferredDS != null && !sources.peek().getDbConnectionName().equals(preferredDS)) {
313 Collections.reverse(sources);
317 // loop through available data sources to retrieve data.
318 while(!sources.isEmpty())
320 active = sources.peek();
322 long time = System.currentTimeMillis();
324 if(!active.isFabric()) {
325 CachedDataSource master = findMaster();
331 sources.remove(active);
332 return active.getData(statement, arguments);
333 } catch(SQLDataException exc){
335 } catch(SQLSyntaxErrorException exc){
337 } catch(SQLIntegrityConstraintViolationException exc){
339 } catch(Throwable exc){
341 String message = exc.getMessage();
342 if(message == null) {
343 if(exc.getCause() != null) {
344 message = exc.getCause().getMessage();
347 message = exc.getClass().getName();
349 LOGGER.error("Generated alarm: "+active.getDbConnectionName()+" - "+message);
350 handleGetConnectionException(active, exc);
352 if(LOGGER.isDebugEnabled()){
353 time = (System.currentTimeMillis() - time);
354 LOGGER.debug("getData processing time : "+ active.getDbConnectionName()+" "+time+" miliseconds.");
358 if(lastException instanceof SQLException){
359 throw (SQLException)lastException;
361 // repackage the exception
362 // you are here because either you run out of available data sources
363 // or the last exception was not of SQLException type.
364 // repackage the exception
365 if(lastException == null) {
366 throw new DBLibException("The operation timed out while waiting to acquire a new connection." );
368 SQLException exception = new DBLibException(lastException.getMessage());
369 exception.setStackTrace(lastException.getStackTrace());
370 if(lastException.getCause() instanceof SQLException) {
371 throw (SQLException)lastException.getCause();
377 private CachedRowSet requestDataNoRecovery(String statement, ArrayList<Object> arguments, String preferredDS) throws SQLException {
378 if(dsQueue.isEmpty()){
379 LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available.");
380 throw new DBLibException("No active DB connection pools are available in RequestDataNoRecovery call.");
382 CachedDataSource active = (CachedDataSource) this.dsQueue.peek();
383 long time = System.currentTimeMillis();
385 if(!active.isFabric()) {
386 CachedDataSource master = findMaster();
390 return active.getData(statement, arguments);
391 // } catch(SQLDataException exc){
393 } catch(Throwable exc){
394 String message = exc.getMessage();
396 message = exc.getClass().getName();
397 LOGGER.error("Generated alarm: "+active.getDbConnectionName()+" - "+message);
398 if(exc instanceof SQLException)
399 throw (SQLException)exc;
401 DBLibException excptn = new DBLibException(exc.getMessage());
402 excptn.setStackTrace(exc.getStackTrace());
406 if(LOGGER.isDebugEnabled()){
407 time = (System.currentTimeMillis() - time);
408 LOGGER.debug(">> getData : "+ active.getDbConnectionName()+" "+time+" miliseconds.");
415 * @see org.openecomp.sdnc.sli.resource.dblib.DbLibService#writeData(java.lang.String, java.util.ArrayList, java.lang.String)
418 public boolean writeData(String statement, ArrayList<String> arguments, String preferredDS) throws SQLException
420 ArrayList<Object> newList=new ArrayList<Object>();
421 if(arguments != null && !arguments.isEmpty()) {
422 newList.addAll(arguments);
425 return writeDataNoRecovery(statement, newList, preferredDS);
428 CachedDataSource findMaster() throws PoolExhaustedException, MySQLNonTransientConnectionException {
429 CachedDataSource master = null;
430 CachedDataSource[] dss = this.dsQueue.toArray(new CachedDataSource[0]);
431 for(int i=0; i<dss.length; i++) {
432 if(!dss[i].isSlave()) {
435 dsQueue.remove(master);
442 LOGGER.warn("MASTER not found.");
448 private boolean writeDataNoRecovery(String statement, ArrayList<Object> arguments, String preferredDS) throws SQLException {
449 if(dsQueue.isEmpty()){
450 LOGGER.error("Generated alarm: DBResourceManager.getData - No active DB connection pools are available.");
451 throw new DBLibException("No active DB connection pools are available in RequestDataNoRecovery call.");
454 boolean initialRequest = true;
455 boolean retryAllowed = true;
456 CachedDataSource active = (CachedDataSource) this.dsQueue.peek();
457 long time = System.currentTimeMillis();
458 while(initialRequest) {
459 initialRequest = false;
461 if(!active.isFabric()) {
462 CachedDataSource master = findMaster();
468 return active.writeData(statement, arguments);
469 } catch(Throwable exc){
470 String message = exc.getMessage();
472 message = exc.getClass().getName();
473 LOGGER.error("Generated alarm: "+active.getDbConnectionName()+" - "+message);
474 if(exc instanceof SQLException) {
475 SQLException sqlExc = SQLException.class.cast(exc);
476 // handle read-only exception
477 if(sqlExc.getErrorCode() == 1290 && "HY000".equals(sqlExc.getSQLState())) {
478 LOGGER.warn("retrying due to: " + sqlExc.getMessage());
479 dsQueue.remove(active);
482 retryAllowed = false;
483 initialRequest = true;
487 throw (SQLException)exc;
489 DBLibException excptn = new DBLibException(exc.getMessage());
490 excptn.setStackTrace(exc.getStackTrace());
494 if(LOGGER.isDebugEnabled()){
495 time = (System.currentTimeMillis() - time);
496 LOGGER.debug("writeData processing time : "+ active.getDbConnectionName()+" "+time+" miliseconds.");
503 private void setDataSource(CachedDataSource dataSource) {
504 if(dataSource.testConnection(true)){
505 this.dsQueue.add(dataSource);
507 this.broken.add(dataSource);
511 public Connection getConnection() throws SQLException {
512 Throwable lastException = null;
513 CachedDataSource active = null;
515 if(dsQueue.isEmpty()){
516 throw new DBLibException("No active DB connection pools are available in GetConnection call.");
520 active = dsQueue.peek();
521 CachedDataSource tmpActive = findMaster();
522 if(tmpActive != null) {
525 return new DBLibConnection(active.getConnection(), active);
526 } catch(javax.sql.rowset.spi.SyncFactoryException exc){
527 LOGGER.debug("Free memory (bytes): " + Runtime.getRuntime().freeMemory());
528 LOGGER.warn("CLASSPATH issue. Allowing retry", exc);
530 } catch(PoolExhaustedException exc) {
531 throw new NoAvailableConnectionsException(exc);
532 } catch(MySQLNonTransientConnectionException exc){
533 throw new NoAvailableConnectionsException(exc);
534 } catch(Exception exc){
537 handleGetConnectionException(active, exc);
539 if(exc instanceof MySQLNonTransientConnectionException) {
540 throw new NoAvailableConnectionsException(exc);
541 } if(exc instanceof SQLException) {
542 throw (SQLException)exc;
544 DBLibException excptn = new DBLibException(exc.getMessage());
545 excptn.setStackTrace(exc.getStackTrace());
549 } catch (Throwable trwb) {
550 DBLibException excptn = new DBLibException(trwb.getMessage());
551 excptn.setStackTrace(trwb.getStackTrace());
554 if(LOGGER.isDebugEnabled()){
559 if(lastException instanceof SQLException){
560 throw (SQLException)lastException;
562 // repackage the exception
563 if(lastException == null) {
564 throw new DBLibException("The operation timed out while waiting to acquire a new connection." );
566 SQLException exception = new DBLibException(lastException.getMessage());
567 exception.setStackTrace(lastException.getStackTrace());
568 if(lastException.getCause() instanceof SQLException) {
569 // exception.setNextException((SQLException)lastException.getCause());
570 throw (SQLException)lastException.getCause();
576 public Connection getConnection(String username, String password)
577 throws SQLException {
578 CachedDataSource active = null;
580 if(dsQueue.isEmpty()){
581 throw new DBLibException("No active DB connection pools are available in GetConnection call.");
586 active = dsQueue.peek();
587 CachedDataSource tmpActive = findMaster();
588 if(tmpActive != null) {
591 return active.getConnection(username, password);
592 } catch(Throwable exc){
594 handleGetConnectionException(active, exc);
596 if(exc instanceof SQLException)
597 throw (SQLException)exc;
599 DBLibException excptn = new DBLibException(exc.getMessage());
600 excptn.setStackTrace(exc.getStackTrace());
607 throw new DBLibException("No connections available in DBResourceManager in GetConnection call.");
610 private void handleGetConnectionException(CachedDataSource source, Throwable exc) {
612 if(!source.canTakeOffLine())
614 LOGGER.error("Could not switch due to blocking");
618 boolean removed = dsQueue.remove(source);
619 if(!broken.contains(source))
621 if(broken.add(source))
623 LOGGER.warn("DB Recovery: DataSource <" + source.getDbConnectionName() + "> put in the recovery mode. Reason : " + exc.getMessage());
625 LOGGER.warn("Error putting DataSource <" +source.getDbConnectionName()+ "> in recovery mode.");
628 LOGGER.info("DB Recovery: DataSource <" + source.getDbConnectionName() + "> already in recovery queue");
632 if(!dsQueue.isEmpty())
634 LOGGER.warn("DB DataSource <" + dsQueue.peek().getDbConnectionName() + "> became active");
637 } catch (Exception e) {
642 public void cleanUp() {
643 for(Iterator<CachedDataSource> it=dsQueue.iterator();it.hasNext();){
644 CachedDataSource cds = (CachedDataSource)it.next();
650 this.terminating = true;
654 broken.add( new TerminatingCachedDataSource(null));
655 } catch(Exception exc){
656 LOGGER.error("Waiting for Worker to stop", exc);
659 worker.join(terminationTimeOut);
660 LOGGER.info("DBResourceManager.RecoveryMgr <"+worker.toString() +"> termination was successful: " + worker.getState());
661 } catch(Exception exc){
662 LOGGER.error("Waiting for Worker thread to terminate ", exc);
666 public static DBResourceManager create(Properties props) throws Exception {
667 DBResourceManager dbmanager = new DBResourceManager(props);
668 dbmanager.config(props);
672 public PrintWriter getLogWriter() throws SQLException {
673 return ((CachedDataSource)this.dsQueue.peek()).getLogWriter();
676 public int getLoginTimeout() throws SQLException {
677 return ((CachedDataSource)this.dsQueue.peek()).getLoginTimeout();
680 public void setLogWriter(PrintWriter out) throws SQLException {
681 ((CachedDataSource)this.dsQueue.peek()).setLogWriter(out);
684 public void setLoginTimeout(int seconds) throws SQLException {
685 ((CachedDataSource)this.dsQueue.peek()).setLoginTimeout(seconds);
688 public void displayState(){
689 if(LOGGER.isDebugEnabled()){
690 LOGGER.debug("POOLS : Active = "+dsQueue.size() + ";\t Broken = "+broken.size());
691 CachedDataSource current = (CachedDataSource)dsQueue.peek();
692 if(current != null) {
693 LOGGER.debug("POOL : Active name = \'"+current.getDbConnectionName()+ "\'");
699 * @see org.openecomp.sdnc.sli.resource.dblib.DbLibService#isActive()
702 public boolean isActive() {
703 return this.dsQueue.size()>0;
706 public String getActiveStatus(){
707 return "Connected: " + dsQueue.size()+"\tIn-recovery: "+broken.size();
710 public String getDBStatus(boolean htmlFormat) {
711 StringBuilder buffer = new StringBuilder();
713 ArrayList<CachedDataSource> list = new ArrayList<CachedDataSource>();
714 list.addAll(dsQueue);
718 buffer.append("<tr class=\"headerRow\"><th id=\"header1\">")
719 .append("Name:").append("</th>");
720 for (int i = 0; i < list.size(); i++) {
721 buffer.append("<th id=\"header").append(2 + i).append("\">");
722 buffer.append(((CachedDataSource) list.get(i)).getDbConnectionName()).append("</th>");
724 buffer.append("</tr>");
726 buffer.append("<tr><td>State:</td>");
727 for (int i = 0; i < list.size(); i++) {
728 if (broken.contains(list.get(i))) {
729 buffer.append("<td>in recovery</td>");
731 if (dsQueue.contains(list.get(i))) {
732 if (dsQueue.peek() == list.get(i))
733 buffer.append("<td>active</td>");
735 buffer.append("<td>standby</td>");
738 buffer.append("</tr>");
741 for (int i = 0; i < list.size(); i++) {
742 buffer.append("Name: ").append(((CachedDataSource) list.get(i)).getDbConnectionName());
743 buffer.append("\tState: ");
744 if (broken.contains(list.get(i))) {
745 buffer.append("in recovery");
747 if (dsQueue.contains(list.get(i))) {
748 if (dsQueue.peek() == list.get(i))
749 buffer.append("active");
751 buffer.append("standby");
758 return buffer.toString();
761 public boolean isWrapperFor(Class<?> iface) throws SQLException {
765 public <T> T unwrap(Class<T> iface) throws SQLException {
770 * @return the monitorDbResponse
772 public final boolean isMonitorDbResponse() {
773 return recoveryMode && monitorDbResponse;
777 CachedDataSource obj = dsQueue.peek();
778 Exception ption = new Exception();
780 for(int i=0; i<5; i++)
782 handleGetConnectionException(obj, ption);
784 } catch(Throwable exc){
785 LOGGER.warn("", exc);
789 public String getPreferredDSName(){
791 return getPreferredDataSourceName(dsSelector);
796 public String getPreferredDataSourceName(AtomicBoolean flipper) {
798 LinkedList<CachedDataSource> snapshot = new LinkedList<CachedDataSource>(dsQueue);
799 if(snapshot.size() > 1){
800 CachedDataSource first = snapshot.getFirst();
801 CachedDataSource last = snapshot.getLast();
803 int delta = first.getMonitor().getPorcessedConnectionsCount() - last.getMonitor().getPorcessedConnectionsCount();
806 } else if(delta > 0) {
809 // check the last value and return !last
810 flipper.getAndSet(!flipper.get());
814 Collections.reverse(snapshot);
816 return snapshot.peek().getDbConnectionName();
819 public java.util.logging.Logger getParentLogger()
820 throws SQLFeatureNotSupportedException {
824 public String getMasterName() {
826 return getMasterDataSourceName(dsSelector);
832 private String getMasterDataSourceName(AtomicBoolean flipper) {
834 LinkedList<CachedDataSource> snapshot = new LinkedList<CachedDataSource>(dsQueue);
835 if(snapshot.size() > 1){
836 CachedDataSource first = snapshot.getFirst();
837 CachedDataSource last = snapshot.getLast();
839 int delta = first.getMonitor().getPorcessedConnectionsCount() - last.getMonitor().getPorcessedConnectionsCount();
842 } else if(delta > 0) {
845 // check the last value and return !last
846 flipper.getAndSet(!flipper.get());
850 Collections.reverse(snapshot);
852 return snapshot.peek().getDbConnectionName();
855 class RemindTask extends TimerTask {
857 CachedDataSource ds = dsQueue.peek();
859 ds.getPoolInfo(false);