2 * ============LICENSE_START====================================================
4 * ===========================================================================
5 * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
6 * ===========================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END====================================================
22 package org.onap.aaf.auth.dao;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.Deque;
27 import java.util.List;
28 import java.util.concurrent.ConcurrentLinkedDeque;
30 import org.onap.aaf.auth.dao.cass.Status;
31 import org.onap.aaf.auth.layer.Result;
32 import org.onap.aaf.misc.env.APIException;
33 import org.onap.aaf.misc.env.Env;
34 import org.onap.aaf.misc.env.TimeTaken;
35 import org.onap.aaf.misc.env.TransStore;
37 import com.datastax.driver.core.BoundStatement;
38 import com.datastax.driver.core.Cluster;
39 import com.datastax.driver.core.ConsistencyLevel;
40 import com.datastax.driver.core.PreparedStatement;
41 import com.datastax.driver.core.ResultSet;
42 import com.datastax.driver.core.ResultSetFuture;
43 import com.datastax.driver.core.Row;
44 import com.datastax.driver.core.Session;
45 import com.datastax.driver.core.exceptions.DriverException;
47 public abstract class AbsCassDAO<TRANS extends TransStore,DATA> {
48 protected static final char DOT = '.';
49 protected static final char DOT_PLUS_ONE = '.'+1;
50 protected static final String FIRST_CHAR = Character.toString((char)0);
51 protected static final String LAST_CHAR = Character.toString((char)Character.MAX_VALUE);
52 protected static final int FIELD_COMMAS = 0;
53 protected static final int QUESTION_COMMAS = 1;
54 protected static final int ASSIGNMENT_COMMAS = 2;
55 protected static final int WHERE_ANDS = 3;
57 private Cluster cluster;
60 * com.datastax.driver.core.Session
61 A session holds connections to a Cassandra cluster, allowing it to be queried. Each session maintains multiple connections to the cluster nodes,
62 provides policies to choose which node to use for each query (round-robin on all nodes of the cluster by default), and handles retries for
63 failed query (when it makes sense), etc...
64 Session instances are thread-safe and usually a single instance is enough per application. However, a given session can only be set to one
65 keyspace at a time, so one instance per keyspace is necessary.
67 private Session session;
68 private final String keyspace;
69 // If this is null, then we own session
70 private final AbsCassDAO<TRANS,?> owningDAO;
71 protected Class<DATA> dataClass;
72 private final String name;
73 // private static Slot sessionSlot; // not used since 2015
74 private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo>();
75 private static final List<Object> EMPTY = new ArrayList<Object>(0);
76 private static final Deque<ResetRequest> resetDeque = new ConcurrentLinkedDeque<ResetRequest>();
77 private static boolean resetTrigger = false;
78 private static long nextAvailableReset = 0;
80 public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass) {
82 this.cluster = cluster;
83 this.keyspace = keyspace;
84 owningDAO = null; // we own session
86 this.dataClass = dataClass;
89 public AbsCassDAO(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass) {
91 cluster = aDao.cluster;
92 keyspace = aDao.keyspace;
94 // We do not own session
96 this.dataClass = dataClass;
99 // Not used since 2015
100 // public static void setSessionSlot(Slot slot) {
101 // sessionSlot = slot;
104 //Note: Lower case ON PURPOSE. These names used to create History Messages
106 create,read,update,delete;
109 public class PSInfo {
110 private PreparedStatement ps;
111 private final int size;
112 private final Loader<DATA> loader;
113 private final CRUD crud; // Store CRUD, because it makes a difference in Object Order, see Loader
114 private final String cql;
115 private final ConsistencyLevel consistency;
119 * Create a PSInfo and create Prepared Statement
125 public PSInfo(TRANS trans, String theCQL, Loader<DATA> loader, ConsistencyLevel consistency) {
126 this.loader = loader;
127 this.consistency=consistency;
130 cql = theCQL.trim().toUpperCase();
131 if(cql.startsWith("INSERT")) {
133 } else if(cql.startsWith("UPDATE")) {
135 } else if(cql.startsWith("DELETE")) {
141 int idx = 0, count=0;
142 while((idx=cql.indexOf('?',idx))>=0) {
149 public synchronized void reset() {
153 private synchronized BoundStatement ps(TransStore trans) throws APIException, IOException {
155 You should prepare only once, and cache the PreparedStatement in your application (it is thread-safe).
156 If you call prepare multiple times with the same query string, the driver will log a warning.
159 TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB);
161 ps = getSession(trans).prepare(cql);
162 ps.setConsistencyLevel(consistency);
163 } catch (DriverException e) {
164 reportPerhapsReset(trans,e);
170 // BoundStatements are NOT threadsafe... need a new one each time.
171 return new BoundStatement(ps);
175 * Execute a Prepared Statement by extracting from DATA object
182 public Result<ResultSetFuture> execAsync(TRANS trans, String text, DATA data) {
183 TimeTaken tt = trans.start(text, Env.REMOTE);
185 return Result.ok(getSession(trans).executeAsync(
186 ps(trans).bind(loader.extract(data, size, crud))));
187 } catch (DriverException | APIException | IOException e) {
188 AbsCassDAO.this.reportPerhapsReset(trans,e);
189 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
196 * Execute a Prepared Statement on Object[] key
203 public Result<ResultSetFuture> execAsync(TRANS trans, String text, Object ... objs) {
204 TimeTaken tt = trans.start(text, Env.REMOTE);
206 return Result.ok(getSession(trans).executeAsync(ps(trans).bind(objs)));
207 } catch (DriverException | APIException | IOException e) {
208 AbsCassDAO.this.reportPerhapsReset(trans,e);
209 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
221 * Execute a Prepared Statement by extracting from DATA object
228 public Result<ResultSet> exec(TRANS trans, String text, DATA data) {
229 TimeTaken tt = trans.start(text, Env.REMOTE);
232 * "execute" (and executeAsync)
233 * Executes the provided query.
234 This method blocks until at least some result has been received from the database. However,
235 for SELECT queries, it does not guarantee that the result has been received in full. But it
236 does guarantee that some response has been received from the database, and in particular
237 guarantee that if the request is invalid, an exception will be thrown by this method.
240 statement - the CQL query to execute (that can be any Statement).
242 the result of the query. That result will never be null but can be empty (and will
243 be for any non SELECT query).
245 return Result.ok(getSession(trans).execute(
246 ps(trans).bind(loader.extract(data, size, crud))));
247 } catch (DriverException | APIException | IOException e) {
248 AbsCassDAO.this.reportPerhapsReset(trans,e);
249 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
256 * Execute a Prepared Statement on Object[] key
263 public Result<ResultSet> exec(TRANS trans, String text, Object ... objs) {
264 TimeTaken tt = trans.start(text, Env.REMOTE);
266 return Result.ok(getSession(trans).execute(ps(trans).bind(objs)));
267 } catch (DriverException | APIException | IOException e) {
268 AbsCassDAO.this.reportPerhapsReset(trans,e);
269 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
276 * Read the Data from Cassandra given a Prepared Statement (defined by the
279 * This is common behavior among all DAOs.
280 * @throws DAOException
282 public Result<List<DATA>> read(TRANS trans, String text, Object[] key) {
283 TimeTaken tt = trans.start(text,Env.REMOTE);
287 rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key));
288 /// TEST CODE for Exception
289 // boolean force = true;
291 // Map<InetSocketAddress, Throwable> misa = new HashMap<InetSocketAddress,Throwable>();
292 // //misa.put(new InetSocketAddress(444),new Exception("no host was tried"));
293 // misa.put(new InetSocketAddress(444),new Exception("Connection has been closed"));
294 // throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa);
295 //// throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried");
298 } catch (DriverException | APIException | IOException e) {
299 AbsCassDAO.this.reportPerhapsReset(trans,e);
300 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
305 return extract(loader,rs,null /*let Array be created if necessary*/,dflt);
308 public Result<List<DATA>> read(TRANS trans, String text, DATA data) {
309 return read(trans,text, loader.extract(data, size, crud));
312 public Object[] keyFrom(DATA data) {
313 return loader.extract(data, size, CRUD.delete); // Delete is key only
317 * Note: in case PSInfos are deleted, we want to remove them from list. This is not expected,
318 * but we don't want a data leak if it does. Finalize doesn't have to happen quickly
321 protected void finalize() throws Throwable {
322 psinfos.remove(this);
326 protected final Accept<DATA> dflt = new Accept<DATA>() {
328 public boolean ok(DATA data) {
334 @SuppressWarnings("unchecked")
335 protected final Result<List<DATA>> extract(Loader<DATA> loader, ResultSet rs, List<DATA> indata, Accept<DATA> accept) {
336 List<Row> rows = rs.all();
338 return Result.ok((List<DATA>)EMPTY); // Result sets now .emptyList(true);
341 List<DATA> data = indata==null?new ArrayList<DATA>(rows.size()):indata;
343 for(Row row : rows) {
345 d = loader.load(dataClass.newInstance(),row);
349 } catch(Exception e) {
350 return Result.err(e);
353 return Result.ok(data);
357 private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created";
358 private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created";
359 private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session";
360 private static final Object LOCK = new Object();
362 private static class ResetRequest {
367 public ResetRequest(Session session) {
368 this.session = session;
369 timestamp = System.currentTimeMillis();
374 public static final void primePSIs(TransStore trans) throws APIException, IOException {
375 for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
382 public final Session getSession(TransStore trans) throws APIException, IOException {
383 // SessionFilter unused since 2015
384 // Try to use Trans' session, if exists
385 // if(sessionSlot!=null) { // try to get from Trans
386 // Session sess = trans.get(sessionSlot, null);
392 // If there's an owning DAO, use it's session
393 if(owningDAO!=null) {
394 return owningDAO.getSession(trans);
397 // OK, nothing else works... get our own.
398 if(session==null || resetTrigger) {
399 Cluster tempCluster = null;
400 Session tempSession = null;
403 boolean reset = false;
404 for(ResetRequest r : resetDeque) {
405 if(r.session == session) {
406 if(r.timestamp>nextAvailableReset) {
408 nextAvailableReset = System.currentTimeMillis() + 60000;
409 tempCluster = cluster;
410 tempSession = session;
413 trans.warn().log("Cassandra Connection Reset Ignored: Recent Reset");
418 if(reset || session == null) {
419 TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB);
421 // Note: Maitrayee recommended not closing the cluster, just
422 // overwrite it. Jonathan 9/30/2016 assuming same for Session
423 // This was a bad idea. Ran out of File Handles as I suspected, Jonathan
425 for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
429 if(reset || cluster==null) {
430 cluster = CassAccess.cluster(trans, keyspace);
431 trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED);
433 if(reset || session==null) {
434 session = cluster.connect(keyspace);
435 trans.warn().log(NEW_CASSANDRA_SESSION_CREATED);
444 TimeTaken tt = trans.start("Clear Reset Deque", Env.SUB);
447 // Not clearing Session/Cluster appears to kill off FileHandles
448 if(tempSession!=null && !tempSession.isClosed()) {
451 if(tempCluster!=null && !tempCluster.isClosed()) {
462 public final boolean reportPerhapsReset(TransStore trans, Exception e) {
463 if(owningDAO!=null) {
464 return owningDAO.reportPerhapsReset(trans, e);
467 if(CassAccess.isResetException(e)) {
468 trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName());
469 resetDeque.addFirst(new ResetRequest(session));
470 rv = resetTrigger = true;
472 trans.error().log(e);
477 public void close(TransStore trans) {
478 if(owningDAO==null) {
480 TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB);
488 trans.debug().log("close called(), Session already closed");
491 owningDAO.close(trans);
495 protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) {
498 protected interface Accept<DATA> {
499 public boolean ok(DATA data);