* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
protected static final int ASSIGNMENT_COMMAS = 2;
protected static final int WHERE_ANDS = 3;
- private Cluster cluster;
+ private Cluster cluster;
/*
* From DataStax
* com.datastax.driver.core.Session
- A session holds connections to a Cassandra cluster, allowing it to be queried. Each session maintains multiple connections to the cluster nodes,
- provides policies to choose which node to use for each query (round-robin on all nodes of the cluster by default), and handles retries for
+ A session holds connections to a Cassandra cluster, allowing it to be queried. Each session maintains multiple connections to the cluster nodes,
+ provides policies to choose which node to use for each query (round-robin on all nodes of the cluster by default), and handles retries for
failed query (when it makes sense), etc...
- Session instances are thread-safe and usually a single instance is enough per application. However, a given session can only be set to one
+ Session instances are thread-safe and usually a single instance is enough per application. However, a given session can only be set to one
keyspace at a time, so one instance per keyspace is necessary.
*/
private Session session;
private static final Deque<ResetRequest> resetDeque = new ConcurrentLinkedDeque<ResetRequest>();
private static boolean resetTrigger = false;
private static long nextAvailableReset = 0;
-
+
public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass) {
this.name = name;
this.cluster = cluster;
owningDAO = aDao;
this.dataClass = dataClass;
}
-
+
// Not used since 2015
// public static void setSessionSlot(Slot slot) {
// sessionSlot = slot;
/**
* Create a PSInfo and create Prepared Statement
- *
+ *
* @param trans
* @param theCQL
* @param loader
} else {
crud = CRUD.read;
}
-
+
int idx = 0, count=0;
while ((idx=cql.indexOf('?',idx))>=0) {
++idx;
}
size=count;
}
-
+
public synchronized void reset() {
ps = null;
}
-
+
private synchronized BoundStatement ps(TransStore trans) throws APIException, IOException {
/* From Datastax
- You should prepare only once, and cache the PreparedStatement in your application (it is thread-safe).
+ You should prepare only once, and cache the PreparedStatement in your application (it is thread-safe).
If you call prepare multiple times with the same query string, the driver will log a warning.
*/
if (ps==null) {
/**
* Execute a Prepared Statement by extracting from DATA object
- *
+ *
* @param trans
* @param text
* @param data
/**
* Execute a Prepared Statement on Object[] key
- *
+ *
* @param trans
* @param text
* @param objs
tt.done();
}
}
-
- /*
+
+ /*
* Note:
- *
+ *
*/
/**
* Execute a Prepared Statement by extracting from DATA object
- *
+ *
* @param trans
* @param text
* @param data
/*
* "execute" (and executeAsync)
* Executes the provided query.
- This method blocks until at least some result has been received from the database. However,
- for SELECT queries, it does not guarantee that the result has been received in full. But it
- does guarantee that some response has been received from the database, and in particular
+ This method blocks until at least some result has been received from the database. However,
+ for SELECT queries, it does not guarantee that the result has been received in full. But it
+ does guarantee that some response has been received from the database, and in particular
guarantee that if the request is invalid, an exception will be thrown by this method.
Parameters:
statement - the CQL query to execute (that can be any Statement).
Returns:
- the result of the query. That result will never be null but can be empty (and will
+ the result of the query. That result will never be null but can be empty (and will
be for any non SELECT query).
*/
return Result.ok(getSession(trans).execute(
/**
* Execute a Prepared Statement on Object[] key
- *
+ *
* @param trans
* @param text
* @param objs
*/
public Result<List<DATA>> read(TRANS trans, String text, Object[] key) {
TimeTaken tt = trans.start(text,Env.REMOTE);
-
+
ResultSet rs;
try {
rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key));
-/// TEST CODE for Exception
-// boolean force = true;
+/// TEST CODE for Exception
+// boolean force = true;
// if (force) {
// Map<InetSocketAddress, Throwable> misa = new HashMap<>();
// //misa.put(new InetSocketAddress(444),new Exception("no host was tried"));
} finally {
tt.done();
}
-
+
return extract(loader,rs,null /*let Array be created if necessary*/,dflt);
}
-
+
public Result<List<DATA>> read(TRANS trans, String text, DATA data) {
return read(trans,text, loader.extract(data, size, crud));
}
-
+
public Object[] keyFrom(DATA data) {
return loader.extract(data, size, CRUD.delete); // Delete is key only
}
/*
- * Note: in case PSInfos are deleted, we want to remove them from list. This is not expected,
+ * Note: in case PSInfos are deleted, we want to remove them from list. This is not expected,
* but we don't want a data leak if it does. Finalize doesn't have to happen quickly
*/
@Override
} else {
DATA d;
List<DATA> data = indata==null?new ArrayList<>(rows.size()):indata;
-
+
for (Row row : rows) {
try {
d = loader.load(dataClass.newInstance(),row);
return Result.ok(data);
}
}
-
+
private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created";
private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created";
private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session";
//package on purpose
Session session;
long timestamp;
-
+
public ResetRequest(Session session) {
this.session = session;
timestamp = System.currentTimeMillis();
}
}
-
+
public static final void primePSIs(TransStore trans) throws APIException, IOException {
for (AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
if (psi.ps==null) {
}
}
}
-
+
public final Session getSession(TransStore trans) throws APIException, IOException {
// SessionFilter unused since 2015
// Try to use Trans' session, if exists
// return sess;
// }
// }
-
+
// If there's an owning DAO, use it's session
- if (owningDAO!=null) {
+ if (owningDAO!=null) {
return owningDAO.getSession(trans);
}
-
+
// OK, nothing else works... get our own.
if (session==null || resetTrigger) {
Cluster tempCluster = null;
}
}
}
-
+
if (reset || session == null) {
TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB);
try {
}
return session;
}
-
+
public final boolean reportPerhapsReset(TransStore trans, Exception e) {
if (owningDAO!=null) {
return owningDAO.reportPerhapsReset(trans, e);
trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName());
resetDeque.addFirst(new ResetRequest(session));
rv = resetTrigger = true;
- }
+ }
trans.error().log(e);
return rv;
}
protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) {
}
-
+
protected interface Accept<DATA> {
public boolean ok(DATA data);
}