X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=auth%2Fauth-cass%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Faaf%2Fauth%2Fdao%2FAbsCassDAO.java;h=de0efdfd84a794ce1ba88f6ec27513a815e94847;hb=1296352d8eafee57f982a4342ad79ada4aa56d28;hp=89fb12fe9d3a62acd47fd6c969f7e1440943cf4b;hpb=d86dd644e1543743cfcbc83b230bd4f2cb2e5d4e;p=aaf%2Fauthz.git diff --git a/auth/auth-cass/src/main/java/org/onap/aaf/auth/dao/AbsCassDAO.java b/auth/auth-cass/src/main/java/org/onap/aaf/auth/dao/AbsCassDAO.java index 89fb12fe..de0efdfd 100644 --- a/auth/auth-cass/src/main/java/org/onap/aaf/auth/dao/AbsCassDAO.java +++ b/auth/auth-cass/src/main/java/org/onap/aaf/auth/dao/AbsCassDAO.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -45,459 +45,459 @@ import com.datastax.driver.core.Session; import com.datastax.driver.core.exceptions.DriverException; public abstract class AbsCassDAO { - protected static final char DOT = '.'; - protected static final char DOT_PLUS_ONE = '.'+1; - protected static final String FIRST_CHAR = Character.toString((char)0); - protected static final String LAST_CHAR = Character.toString((char)Character.MAX_VALUE); - protected static final int FIELD_COMMAS = 0; - protected static final int QUESTION_COMMAS = 1; - protected static final int ASSIGNMENT_COMMAS = 2; - protected static final int WHERE_ANDS = 3; - - private Cluster cluster; - /* - * From DataStax - * com.datastax.driver.core.Session - A session holds connections to a Cassandra cluster, allowing it to be queried. Each session maintains multiple connections to the cluster nodes, - provides policies to choose which node to use for each query (round-robin on all nodes of the cluster by default), and handles retries for - failed query (when it makes sense), etc... - Session instances are thread-safe and usually a single instance is enough per application. However, a given session can only be set to one - keyspace at a time, so one instance per keyspace is necessary. - */ - private Session session; - private final String keyspace; - // If this is null, then we own session - private final AbsCassDAO owningDAO; - protected Class dataClass; - private final String name; -// private static Slot sessionSlot; // not used since 2015 - private static final ArrayList.PSInfo> psinfos = new ArrayList.PSInfo>(); - private static final List EMPTY = new ArrayList(0); - private static final Deque resetDeque = new ConcurrentLinkedDeque(); - private static boolean resetTrigger = false; - private static long nextAvailableReset = 0; - - public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class dataClass) { - this.name = name; - this.cluster = cluster; - this.keyspace = keyspace; - owningDAO = null; // we own session - session = null; - this.dataClass = dataClass; - } - - public AbsCassDAO(TRANS trans, String name, AbsCassDAO aDao, Class dataClass) { - this.name = name; - cluster = aDao.cluster; - keyspace = aDao.keyspace; - session = null; - // We do not own session - owningDAO = aDao; - this.dataClass = dataClass; - } - + protected static final char DOT = '.'; + protected static final char DOT_PLUS_ONE = '.'+1; + protected static final String FIRST_CHAR = Character.toString((char)0); + protected static final String LAST_CHAR = Character.toString((char)Character.MAX_VALUE); + protected static final int FIELD_COMMAS = 0; + protected static final int QUESTION_COMMAS = 1; + protected static final int ASSIGNMENT_COMMAS = 2; + protected static final int WHERE_ANDS = 3; + + private Cluster cluster; + /* + * From DataStax + * com.datastax.driver.core.Session + A session holds connections to a Cassandra cluster, allowing it to be queried. Each session maintains multiple connections to the cluster nodes, + provides policies to choose which node to use for each query (round-robin on all nodes of the cluster by default), and handles retries for + failed query (when it makes sense), etc... + Session instances are thread-safe and usually a single instance is enough per application. However, a given session can only be set to one + keyspace at a time, so one instance per keyspace is necessary. + */ + private Session session; + private final String keyspace; + // If this is null, then we own session + private final AbsCassDAO owningDAO; + protected Class dataClass; + private final String name; +// private static Slot sessionSlot; // not used since 2015 + private static final ArrayList.PSInfo> psinfos = new ArrayList<>(); + private static final List EMPTY = new ArrayList<>(0); + private static final Deque resetDeque = new ConcurrentLinkedDeque(); + private static boolean resetTrigger = false; + private static long nextAvailableReset = 0; + + public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class dataClass) { + this.name = name; + this.cluster = cluster; + this.keyspace = keyspace; + owningDAO = null; // we own session + session = null; + this.dataClass = dataClass; + } + + public AbsCassDAO(TRANS trans, String name, AbsCassDAO aDao, Class dataClass) { + this.name = name; + cluster = aDao.cluster; + keyspace = aDao.keyspace; + session = null; + // We do not own session + owningDAO = aDao; + this.dataClass = dataClass; + } + // Not used since 2015 -// public static void setSessionSlot(Slot slot) { -// sessionSlot = slot; -// } - - //Note: Lower case ON PURPOSE. These names used to create History Messages - public enum CRUD { - create,read,update,delete; - } - - public class PSInfo { - private PreparedStatement ps; - private final int size; - private final Loader loader; - private final CRUD crud; // Store CRUD, because it makes a difference in Object Order, see Loader - private final String cql; - private final ConsistencyLevel consistency; - - - /** - * Create a PSInfo and create Prepared Statement - * - * @param trans - * @param theCQL - * @param loader - */ - public PSInfo(TRANS trans, String theCQL, Loader loader, ConsistencyLevel consistency) { - this.loader = loader; - this.consistency=consistency; - psinfos.add(this); - - cql = theCQL.trim().toUpperCase(); - if(cql.startsWith("INSERT")) { - crud = CRUD.create; - } else if(cql.startsWith("UPDATE")) { - crud = CRUD.update; - } else if(cql.startsWith("DELETE")) { - crud = CRUD.delete; - } else { - crud = CRUD.read; - } - - int idx = 0, count=0; - while((idx=cql.indexOf('?',idx))>=0) { - ++idx; - ++count; - } - size=count; - } - - public synchronized void reset() { - ps = null; - } - - private synchronized BoundStatement ps(TransStore trans) throws APIException, IOException { - /* From Datastax - You should prepare only once, and cache the PreparedStatement in your application (it is thread-safe). - If you call prepare multiple times with the same query string, the driver will log a warning. - */ - if(ps==null) { - TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB); - try { - ps = getSession(trans).prepare(cql); - ps.setConsistencyLevel(consistency); - } catch (DriverException e) { - reportPerhapsReset(trans,e); - throw e; - } finally { - tt.done(); - } - } - // BoundStatements are NOT threadsafe... need a new one each time. - return new BoundStatement(ps); - } - - /** - * Execute a Prepared Statement by extracting from DATA object - * - * @param trans - * @param text - * @param data - * @return - */ - public Result execAsync(TRANS trans, String text, DATA data) { - TimeTaken tt = trans.start(text, Env.REMOTE); - try { - return Result.ok(getSession(trans).executeAsync( - ps(trans).bind(loader.extract(data, size, crud)))); - } catch (DriverException | APIException | IOException e) { - AbsCassDAO.this.reportPerhapsReset(trans,e); - return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql); - } finally { - tt.done(); - } - } - - /** - * Execute a Prepared Statement on Object[] key - * - * @param trans - * @param text - * @param objs - * @return - */ - public Result execAsync(TRANS trans, String text, Object ... objs) { - TimeTaken tt = trans.start(text, Env.REMOTE); - try { - return Result.ok(getSession(trans).executeAsync(ps(trans).bind(objs))); - } catch (DriverException | APIException | IOException e) { - AbsCassDAO.this.reportPerhapsReset(trans,e); - return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql); - } finally { - tt.done(); - } - } - - /* - * Note: - * - */ - - /** - * Execute a Prepared Statement by extracting from DATA object - * - * @param trans - * @param text - * @param data - * @return - */ - public Result exec(TRANS trans, String text, DATA data) { - TimeTaken tt = trans.start(text, Env.REMOTE); - try { - /* - * "execute" (and executeAsync) - * Executes the provided query. - This method blocks until at least some result has been received from the database. However, - for SELECT queries, it does not guarantee that the result has been received in full. But it - does guarantee that some response has been received from the database, and in particular - guarantee that if the request is invalid, an exception will be thrown by this method. - - Parameters: - statement - the CQL query to execute (that can be any Statement). - Returns: - the result of the query. That result will never be null but can be empty (and will - be for any non SELECT query). - */ - return Result.ok(getSession(trans).execute( - ps(trans).bind(loader.extract(data, size, crud)))); - } catch (DriverException | APIException | IOException e) { - AbsCassDAO.this.reportPerhapsReset(trans,e); - return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql); - } finally { - tt.done(); - } - } - - /** - * Execute a Prepared Statement on Object[] key - * - * @param trans - * @param text - * @param objs - * @return - */ - public Result exec(TRANS trans, String text, Object ... objs) { - TimeTaken tt = trans.start(text, Env.REMOTE); - try { - return Result.ok(getSession(trans).execute(ps(trans).bind(objs))); - } catch (DriverException | APIException | IOException e) { - AbsCassDAO.this.reportPerhapsReset(trans,e); - return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql); - } finally { - tt.done(); - } - } - - /** - * Read the Data from Cassandra given a Prepared Statement (defined by the - * DAO Instance) - * - * This is common behavior among all DAOs. - * @throws DAOException - */ - public Result> read(TRANS trans, String text, Object[] key) { - TimeTaken tt = trans.start(text,Env.REMOTE); - - ResultSet rs; - try { - rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key)); -/// TEST CODE for Exception -// boolean force = true; -// if(force) { -// Map misa = new HashMap(); -// //misa.put(new InetSocketAddress(444),new Exception("no host was tried")); -// misa.put(new InetSocketAddress(444),new Exception("Connection has been closed")); -// throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa); -//// throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried"); -// } +// public static void setSessionSlot(Slot slot) { +// sessionSlot = slot; +// } + + //Note: Lower case ON PURPOSE. These names used to create History Messages + public enum CRUD { + create,read,update,delete; + } + + public class PSInfo { + private PreparedStatement ps; + private final int size; + private final Loader loader; + private final CRUD crud; // Store CRUD, because it makes a difference in Object Order, see Loader + private final String cql; + private final ConsistencyLevel consistency; + + + /** + * Create a PSInfo and create Prepared Statement + * + * @param trans + * @param theCQL + * @param loader + */ + public PSInfo(TRANS trans, String theCQL, Loader loader, ConsistencyLevel consistency) { + this.loader = loader; + this.consistency=consistency; + psinfos.add(this); + + cql = theCQL.trim().toUpperCase(); + if (cql.startsWith("INSERT")) { + crud = CRUD.create; + } else if (cql.startsWith("UPDATE")) { + crud = CRUD.update; + } else if (cql.startsWith("DELETE")) { + crud = CRUD.delete; + } else { + crud = CRUD.read; + } + + int idx = 0, count=0; + while ((idx=cql.indexOf('?',idx))>=0) { + ++idx; + ++count; + } + size=count; + } + + public synchronized void reset() { + ps = null; + } + + private synchronized BoundStatement ps(TransStore trans) throws APIException, IOException { + /* From Datastax + You should prepare only once, and cache the PreparedStatement in your application (it is thread-safe). + If you call prepare multiple times with the same query string, the driver will log a warning. + */ + if (ps==null) { + TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB); + try { + ps = getSession(trans).prepare(cql); + ps.setConsistencyLevel(consistency); + } catch (DriverException e) { + reportPerhapsReset(trans,e); + throw e; + } finally { + tt.done(); + } + } + // BoundStatements are NOT threadsafe... need a new one each time. + return new BoundStatement(ps); + } + + /** + * Execute a Prepared Statement by extracting from DATA object + * + * @param trans + * @param text + * @param data + * @return + */ + public Result execAsync(TRANS trans, String text, DATA data) { + TimeTaken tt = trans.start(text, Env.REMOTE); + try { + return Result.ok(getSession(trans).executeAsync( + ps(trans).bind(loader.extract(data, size, crud)))); + } catch (DriverException | APIException | IOException e) { + AbsCassDAO.this.reportPerhapsReset(trans,e); + return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql); + } finally { + tt.done(); + } + } + + /** + * Execute a Prepared Statement on Object[] key + * + * @param trans + * @param text + * @param objs + * @return + */ + public Result execAsync(TRANS trans, String text, Object ... objs) { + TimeTaken tt = trans.start(text, Env.REMOTE); + try { + return Result.ok(getSession(trans).executeAsync(ps(trans).bind(objs))); + } catch (DriverException | APIException | IOException e) { + AbsCassDAO.this.reportPerhapsReset(trans,e); + return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql); + } finally { + tt.done(); + } + } + + /* + * Note: + * + */ + + /** + * Execute a Prepared Statement by extracting from DATA object + * + * @param trans + * @param text + * @param data + * @return + */ + public Result exec(TRANS trans, String text, DATA data) { + TimeTaken tt = trans.start(text, Env.REMOTE); + try { + /* + * "execute" (and executeAsync) + * Executes the provided query. + This method blocks until at least some result has been received from the database. However, + for SELECT queries, it does not guarantee that the result has been received in full. But it + does guarantee that some response has been received from the database, and in particular + guarantee that if the request is invalid, an exception will be thrown by this method. + + Parameters: + statement - the CQL query to execute (that can be any Statement). + Returns: + the result of the query. That result will never be null but can be empty (and will + be for any non SELECT query). + */ + return Result.ok(getSession(trans).execute( + ps(trans).bind(loader.extract(data, size, crud)))); + } catch (DriverException | APIException | IOException e) { + AbsCassDAO.this.reportPerhapsReset(trans,e); + return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql); + } finally { + tt.done(); + } + } + + /** + * Execute a Prepared Statement on Object[] key + * + * @param trans + * @param text + * @param objs + * @return + */ + public Result exec(TRANS trans, String text, Object ... objs) { + TimeTaken tt = trans.start(text, Env.REMOTE); + try { + return Result.ok(getSession(trans).execute(ps(trans).bind(objs))); + } catch (DriverException | APIException | IOException e) { + AbsCassDAO.this.reportPerhapsReset(trans,e); + return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql); + } finally { + tt.done(); + } + } + + /** + * Read the Data from Cassandra given a Prepared Statement (defined by the + * DAO Instance) + * + * This is common behavior among all DAOs. + * @throws DAOException + */ + public Result> read(TRANS trans, String text, Object[] key) { + TimeTaken tt = trans.start(text,Env.REMOTE); + + ResultSet rs; + try { + rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key)); +/// TEST CODE for Exception +// boolean force = true; +// if (force) { +// Map misa = new HashMap<>(); +// //misa.put(new InetSocketAddress(444),new Exception("no host was tried")); +// misa.put(new InetSocketAddress(444),new Exception("Connection has been closed")); +// throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa); +//// throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried"); +// } //// END TEST CODE - } catch (DriverException | APIException | IOException e) { - AbsCassDAO.this.reportPerhapsReset(trans,e); - return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql); - } finally { - tt.done(); - } - - return extract(loader,rs,null /*let Array be created if necessary*/,dflt); - } - - public Result> read(TRANS trans, String text, DATA data) { - return read(trans,text, loader.extract(data, size, crud)); - } - - public Object[] keyFrom(DATA data) { - return loader.extract(data, size, CRUD.delete); // Delete is key only - } - - /* - * Note: in case PSInfos are deleted, we want to remove them from list. This is not expected, - * but we don't want a data leak if it does. Finalize doesn't have to happen quickly - */ - @Override - protected void finalize() throws Throwable { - psinfos.remove(this); - } - } - - protected final Accept dflt = new Accept() { - @Override - public boolean ok(DATA data) { - return true; - } - }; - - - @SuppressWarnings("unchecked") + } catch (DriverException | APIException | IOException e) { + AbsCassDAO.this.reportPerhapsReset(trans,e); + return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql); + } finally { + tt.done(); + } + + return extract(loader,rs,null /*let Array be created if necessary*/,dflt); + } + + public Result> read(TRANS trans, String text, DATA data) { + return read(trans,text, loader.extract(data, size, crud)); + } + + public Object[] keyFrom(DATA data) { + return loader.extract(data, size, CRUD.delete); // Delete is key only + } + + /* + * Note: in case PSInfos are deleted, we want to remove them from list. This is not expected, + * but we don't want a data leak if it does. Finalize doesn't have to happen quickly + */ + @Override + protected void finalize() throws Throwable { + psinfos.remove(this); + } + } + + protected final Accept dflt = new Accept() { + @Override + public boolean ok(DATA data) { + return true; + } + }; + + + @SuppressWarnings("unchecked") protected final Result> extract(Loader loader, ResultSet rs, List indata, Accept accept) { - List rows = rs.all(); - if(rows.isEmpty()) { - return Result.ok((List)EMPTY); // Result sets now .emptyList(true); - } else { - DATA d; - List data = indata==null?new ArrayList(rows.size()):indata; - - for(Row row : rows) { - try { - d = loader.load(dataClass.newInstance(),row); - if(accept.ok(d)) { - data.add(d); - } - } catch(Exception e) { - return Result.err(e); - } - } - return Result.ok(data); - } + List rows = rs.all(); + if (rows.isEmpty()) { + return Result.ok((List)EMPTY); // Result sets now .emptyList(true); + } else { + DATA d; + List data = indata==null?new ArrayList<>(rows.size()):indata; + + for (Row row : rows) { + try { + d = loader.load(dataClass.newInstance(),row); + if (accept.ok(d)) { + data.add(d); + } + } catch (Exception e) { + return Result.err(e); + } + } + return Result.ok(data); + } + } + + private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created"; + private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created"; + private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session"; + private static final Object LOCK = new Object(); + + private static class ResetRequest { + //package on purpose + Session session; + long timestamp; + + public ResetRequest(Session session) { + this.session = session; + timestamp = System.currentTimeMillis(); + } + } + + + public static final void primePSIs(TransStore trans) throws APIException, IOException { + for (AbsCassDAO.PSInfo psi : psinfos) { + if (psi.ps==null) { + psi.ps(trans); + } + } + } + + public final Session getSession(TransStore trans) throws APIException, IOException { + // SessionFilter unused since 2015 + // Try to use Trans' session, if exists +// if (sessionSlot!=null) { // try to get from Trans +// Session sess = trans.get(sessionSlot, null); +// if (sess!=null) { +// return sess; +// } +// } + + // If there's an owning DAO, use it's session + if (owningDAO!=null) { + return owningDAO.getSession(trans); + } + + // OK, nothing else works... get our own. + if (session==null || resetTrigger) { + Cluster tempCluster = null; + Session tempSession = null; + try { + synchronized(LOCK) { + boolean reset = false; + for (ResetRequest r : resetDeque) { + if (r.session == session) { + if (r.timestamp>nextAvailableReset) { + reset=true; + nextAvailableReset = System.currentTimeMillis() + 60000; + tempCluster = cluster; + tempSession = session; + break; + } else { + trans.warn().log("Cassandra Connection Reset Ignored: Recent Reset"); + } + } + } + + if (reset || session == null) { + TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB); + try { + // Note: Maitrayee recommended not closing the cluster, just + // overwrite it. Jonathan 9/30/2016 assuming same for Session + // This was a bad idea. Ran out of File Handles as I suspected, Jonathan + if (reset) { + for (AbsCassDAO.PSInfo psi : psinfos) { + psi.reset(); + } + } + if (reset || cluster==null) { + cluster = CassAccess.cluster(trans, keyspace); + trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED); + } + if (reset || session==null) { + session = cluster.connect(keyspace); + trans.warn().log(NEW_CASSANDRA_SESSION_CREATED); + } + } finally { + resetTrigger=false; + tt.done(); + } + } + } + } finally { + TimeTaken tt = trans.start("Clear Reset Deque", Env.SUB); + try { + resetDeque.clear(); + // Not clearing Session/Cluster appears to kill off FileHandles + if (tempSession!=null && !tempSession.isClosed()) { + tempSession.close(); + } + if (tempCluster!=null && !tempCluster.isClosed()) { + tempCluster.close(); + } + } finally { + tt.done(); + } + } + } + return session; + } + + public final boolean reportPerhapsReset(TransStore trans, Exception e) { + if (owningDAO!=null) { + return owningDAO.reportPerhapsReset(trans, e); + } else { + boolean rv = false; + if (CassAccess.isResetException(e)) { + trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName()); + resetDeque.addFirst(new ResetRequest(session)); + rv = resetTrigger = true; + } + trans.error().log(e); + return rv; + } + } + + public void close(TransStore trans) { + if (owningDAO==null) { + if (session!=null) { + TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB); + try { + session.close(); + } finally { + tt.done(); + } + session = null; + } else { + trans.debug().log("close called(), Session already closed"); + } + } else { + owningDAO.close(trans); + } + } + + protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) { + } + + protected interface Accept { + public boolean ok(DATA data); } - - private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created"; - private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created"; - private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session"; - private static final Object LOCK = new Object(); - - private static class ResetRequest { - //package on purpose - Session session; - long timestamp; - - public ResetRequest(Session session) { - this.session = session; - timestamp = System.currentTimeMillis(); - } - } - - - public static final void primePSIs(TransStore trans) throws APIException, IOException { - for(AbsCassDAO.PSInfo psi : psinfos) { - if(psi.ps==null) { - psi.ps(trans); - } - } - } - - public final Session getSession(TransStore trans) throws APIException, IOException { - // SessionFilter unused since 2015 - // Try to use Trans' session, if exists -// if(sessionSlot!=null) { // try to get from Trans -// Session sess = trans.get(sessionSlot, null); -// if(sess!=null) { -// return sess; -// } -// } - - // If there's an owning DAO, use it's session - if(owningDAO!=null) { - return owningDAO.getSession(trans); - } - - // OK, nothing else works... get our own. - if(session==null || resetTrigger) { - Cluster tempCluster = null; - Session tempSession = null; - try { - synchronized(LOCK) { - boolean reset = false; - for(ResetRequest r : resetDeque) { - if(r.session == session) { - if(r.timestamp>nextAvailableReset) { - reset=true; - nextAvailableReset = System.currentTimeMillis() + 60000; - tempCluster = cluster; - tempSession = session; - break; - } else { - trans.warn().log("Cassandra Connection Reset Ignored: Recent Reset"); - } - } - } - - if(reset || session == null) { - TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB); - try { - // Note: Maitrayee recommended not closing the cluster, just - // overwrite it. Jonathan 9/30/2016 assuming same for Session - // This was a bad idea. Ran out of File Handles as I suspected, Jonathan - if(reset) { - for(AbsCassDAO.PSInfo psi : psinfos) { - psi.reset(); - } - } - if(reset || cluster==null) { - cluster = CassAccess.cluster(trans, keyspace); - trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED); - } - if(reset || session==null) { - session = cluster.connect(keyspace); - trans.warn().log(NEW_CASSANDRA_SESSION_CREATED); - } - } finally { - resetTrigger=false; - tt.done(); - } - } - } - } finally { - TimeTaken tt = trans.start("Clear Reset Deque", Env.SUB); - try { - resetDeque.clear(); - // Not clearing Session/Cluster appears to kill off FileHandles - if(tempSession!=null && !tempSession.isClosed()) { - tempSession.close(); - } - if(tempCluster!=null && !tempCluster.isClosed()) { - tempCluster.close(); - } - } finally { - tt.done(); - } - } - } - return session; - } - - public final boolean reportPerhapsReset(TransStore trans, Exception e) { - if(owningDAO!=null) { - return owningDAO.reportPerhapsReset(trans, e); - } else { - boolean rv = false; - if(CassAccess.isResetException(e)) { - trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName()); - resetDeque.addFirst(new ResetRequest(session)); - rv = resetTrigger = true; - } - trans.error().log(e); - return rv; - } - } - - public void close(TransStore trans) { - if(owningDAO==null) { - if(session!=null) { - TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB); - try { - session.close(); - } finally { - tt.done(); - } - session = null; - } else { - trans.debug().log("close called(), Session already closed"); - } - } else { - owningDAO.close(trans); - } - } - - protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) { - } - - protected interface Accept { - public boolean ok(DATA data); - } }