* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
protected static final int ASSIGNMENT_COMMAS = 2;
protected static final int WHERE_ANDS = 3;
- private Cluster cluster;
+ private Cluster cluster;
/*
* From DataStax
* com.datastax.driver.core.Session
- A session holds connections to a Cassandra cluster, allowing it to be queried. Each session maintains multiple connections to the cluster nodes,
- provides policies to choose which node to use for each query (round-robin on all nodes of the cluster by default), and handles retries for
+ A session holds connections to a Cassandra cluster, allowing it to be queried. Each session maintains multiple connections to the cluster nodes,
+ provides policies to choose which node to use for each query (round-robin on all nodes of the cluster by default), and handles retries for
failed query (when it makes sense), etc...
- Session instances are thread-safe and usually a single instance is enough per application. However, a given session can only be set to one
+ Session instances are thread-safe and usually a single instance is enough per application. However, a given session can only be set to one
keyspace at a time, so one instance per keyspace is necessary.
*/
private Session session;
private static final Deque<ResetRequest> resetDeque = new ConcurrentLinkedDeque<ResetRequest>();
private static boolean resetTrigger = false;
private static long nextAvailableReset = 0;
-
+
public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass) {
this.name = name;
this.cluster = cluster;
owningDAO = aDao;
this.dataClass = dataClass;
}
-
+
// Not used since 2015
// public static void setSessionSlot(Slot slot) {
// sessionSlot = slot;
/**
* Create a PSInfo and create Prepared Statement
- *
+ *
* @param trans
* @param theCQL
* @param loader
psinfos.add(this);
cql = theCQL.trim().toUpperCase();
- if(cql.startsWith("INSERT")) {
+ if (cql.startsWith("INSERT")) {
crud = CRUD.create;
- } else if(cql.startsWith("UPDATE")) {
+ } else if (cql.startsWith("UPDATE")) {
crud = CRUD.update;
- } else if(cql.startsWith("DELETE")) {
+ } else if (cql.startsWith("DELETE")) {
crud = CRUD.delete;
} else {
crud = CRUD.read;
}
-
+
int idx = 0, count=0;
- while((idx=cql.indexOf('?',idx))>=0) {
+ while ((idx=cql.indexOf('?',idx))>=0) {
++idx;
++count;
}
size=count;
}
-
+
public synchronized void reset() {
ps = null;
}
-
+
private synchronized BoundStatement ps(TransStore trans) throws APIException, IOException {
/* From Datastax
- You should prepare only once, and cache the PreparedStatement in your application (it is thread-safe).
+ You should prepare only once, and cache the PreparedStatement in your application (it is thread-safe).
If you call prepare multiple times with the same query string, the driver will log a warning.
*/
- if(ps==null) {
+ if (ps==null) {
TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB);
try {
ps = getSession(trans).prepare(cql);
/**
* Execute a Prepared Statement by extracting from DATA object
- *
+ *
* @param trans
* @param text
* @param data
/**
* Execute a Prepared Statement on Object[] key
- *
+ *
* @param trans
* @param text
* @param objs
tt.done();
}
}
-
- /*
+
+ /*
* Note:
- *
+ *
*/
/**
* Execute a Prepared Statement by extracting from DATA object
- *
+ *
* @param trans
* @param text
* @param data
/*
* "execute" (and executeAsync)
* Executes the provided query.
- This method blocks until at least some result has been received from the database. However,
- for SELECT queries, it does not guarantee that the result has been received in full. But it
- does guarantee that some response has been received from the database, and in particular
+ This method blocks until at least some result has been received from the database. However,
+ for SELECT queries, it does not guarantee that the result has been received in full. But it
+ does guarantee that some response has been received from the database, and in particular
guarantee that if the request is invalid, an exception will be thrown by this method.
Parameters:
statement - the CQL query to execute (that can be any Statement).
Returns:
- the result of the query. That result will never be null but can be empty (and will
+ the result of the query. That result will never be null but can be empty (and will
be for any non SELECT query).
*/
return Result.ok(getSession(trans).execute(
/**
* Execute a Prepared Statement on Object[] key
- *
+ *
* @param trans
* @param text
* @param objs
*/
public Result<List<DATA>> read(TRANS trans, String text, Object[] key) {
TimeTaken tt = trans.start(text,Env.REMOTE);
-
+
ResultSet rs;
try {
rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key));
-/// TEST CODE for Exception
-// boolean force = true;
-// if(force) {
+/// TEST CODE for Exception
+// boolean force = true;
+// if (force) {
// Map<InetSocketAddress, Throwable> misa = new HashMap<>();
// //misa.put(new InetSocketAddress(444),new Exception("no host was tried"));
// misa.put(new InetSocketAddress(444),new Exception("Connection has been closed"));
} finally {
tt.done();
}
-
+
return extract(loader,rs,null /*let Array be created if necessary*/,dflt);
}
-
+
public Result<List<DATA>> read(TRANS trans, String text, DATA data) {
return read(trans,text, loader.extract(data, size, crud));
}
-
+
public Object[] keyFrom(DATA data) {
return loader.extract(data, size, CRUD.delete); // Delete is key only
}
/*
- * Note: in case PSInfos are deleted, we want to remove them from list. This is not expected,
+ * Note: in case PSInfos are deleted, we want to remove them from list. This is not expected,
* but we don't want a data leak if it does. Finalize doesn't have to happen quickly
*/
@Override
@SuppressWarnings("unchecked")
protected final Result<List<DATA>> extract(Loader<DATA> loader, ResultSet rs, List<DATA> indata, Accept<DATA> accept) {
List<Row> rows = rs.all();
- if(rows.isEmpty()) {
+ if (rows.isEmpty()) {
return Result.ok((List<DATA>)EMPTY); // Result sets now .emptyList(true);
} else {
DATA d;
List<DATA> data = indata==null?new ArrayList<>(rows.size()):indata;
-
- for(Row row : rows) {
+
+ for (Row row : rows) {
try {
d = loader.load(dataClass.newInstance(),row);
- if(accept.ok(d)) {
+ if (accept.ok(d)) {
data.add(d);
}
- } catch(Exception e) {
+ } catch (Exception e) {
return Result.err(e);
}
}
return Result.ok(data);
}
}
-
+
private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created";
private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created";
private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session";
//package on purpose
Session session;
long timestamp;
-
+
public ResetRequest(Session session) {
this.session = session;
timestamp = System.currentTimeMillis();
}
}
-
+
public static final void primePSIs(TransStore trans) throws APIException, IOException {
- for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
- if(psi.ps==null) {
+ for (AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
+ if (psi.ps==null) {
psi.ps(trans);
}
}
}
-
+
public final Session getSession(TransStore trans) throws APIException, IOException {
// SessionFilter unused since 2015
// Try to use Trans' session, if exists
-// if(sessionSlot!=null) { // try to get from Trans
+// if (sessionSlot!=null) { // try to get from Trans
// Session sess = trans.get(sessionSlot, null);
-// if(sess!=null) {
+// if (sess!=null) {
// return sess;
// }
// }
-
+
// If there's an owning DAO, use it's session
- if(owningDAO!=null) {
+ if (owningDAO!=null) {
return owningDAO.getSession(trans);
}
-
+
// OK, nothing else works... get our own.
- if(session==null || resetTrigger) {
+ if (session==null || resetTrigger) {
Cluster tempCluster = null;
Session tempSession = null;
try {
synchronized(LOCK) {
boolean reset = false;
- for(ResetRequest r : resetDeque) {
- if(r.session == session) {
- if(r.timestamp>nextAvailableReset) {
+ for (ResetRequest r : resetDeque) {
+ if (r.session == session) {
+ if (r.timestamp>nextAvailableReset) {
reset=true;
nextAvailableReset = System.currentTimeMillis() + 60000;
tempCluster = cluster;
}
}
}
-
- if(reset || session == null) {
+
+ if (reset || session == null) {
TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB);
try {
// Note: Maitrayee recommended not closing the cluster, just
// overwrite it. Jonathan 9/30/2016 assuming same for Session
// This was a bad idea. Ran out of File Handles as I suspected, Jonathan
- if(reset) {
- for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
+ if (reset) {
+ for (AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
psi.reset();
}
}
- if(reset || cluster==null) {
+ if (reset || cluster==null) {
cluster = CassAccess.cluster(trans, keyspace);
trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED);
}
- if(reset || session==null) {
+ if (reset || session==null) {
session = cluster.connect(keyspace);
trans.warn().log(NEW_CASSANDRA_SESSION_CREATED);
}
try {
resetDeque.clear();
// Not clearing Session/Cluster appears to kill off FileHandles
- if(tempSession!=null && !tempSession.isClosed()) {
+ if (tempSession!=null && !tempSession.isClosed()) {
tempSession.close();
}
- if(tempCluster!=null && !tempCluster.isClosed()) {
+ if (tempCluster!=null && !tempCluster.isClosed()) {
tempCluster.close();
}
} finally {
}
return session;
}
-
+
public final boolean reportPerhapsReset(TransStore trans, Exception e) {
- if(owningDAO!=null) {
+ if (owningDAO!=null) {
return owningDAO.reportPerhapsReset(trans, e);
} else {
boolean rv = false;
- if(CassAccess.isResetException(e)) {
+ if (CassAccess.isResetException(e)) {
trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName());
resetDeque.addFirst(new ResetRequest(session));
rv = resetTrigger = true;
- }
+ }
trans.error().log(e);
return rv;
}
}
public void close(TransStore trans) {
- if(owningDAO==null) {
- if(session!=null) {
+ if (owningDAO==null) {
+ if (session!=null) {
TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB);
try {
session.close();
protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) {
}
-
+
protected interface Accept<DATA> {
public boolean ok(DATA data);
}