2 * ============LICENSE_START====================================================
4 * ===========================================================================
5 * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
6 * ===========================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END====================================================
22 package org.onap.aaf.auth.dao;
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.Deque;
27 import java.util.List;
28 import java.util.concurrent.ConcurrentLinkedDeque;
30 import org.onap.aaf.auth.dao.cass.Status;
31 import org.onap.aaf.auth.layer.Result;
32 import org.onap.aaf.misc.env.APIException;
33 import org.onap.aaf.misc.env.Env;
34 import org.onap.aaf.misc.env.TimeTaken;
35 import org.onap.aaf.misc.env.TransStore;
37 import com.datastax.driver.core.BoundStatement;
38 import com.datastax.driver.core.Cluster;
39 import com.datastax.driver.core.ConsistencyLevel;
40 import com.datastax.driver.core.PreparedStatement;
41 import com.datastax.driver.core.ResultSet;
42 import com.datastax.driver.core.ResultSetFuture;
43 import com.datastax.driver.core.Row;
44 import com.datastax.driver.core.Session;
45 import com.datastax.driver.core.exceptions.DriverException;
47 public abstract class AbsCassDAO<TRANS extends TransStore,DATA> {
48 protected static final char DOT = '.';
49 protected static final char DOT_PLUS_ONE = '.'+1;
50 protected static final String FIRST_CHAR = Character.toString((char)0);
51 protected static final String LAST_CHAR = Character.toString((char)Character.MAX_VALUE);
52 protected static final int FIELD_COMMAS = 0;
53 protected static final int QUESTION_COMMAS = 1;
54 protected static final int ASSIGNMENT_COMMAS = 2;
55 protected static final int WHERE_ANDS = 3;
57 private Cluster cluster;
60 * com.datastax.driver.core.Session
61 A session holds connections to a Cassandra cluster, allowing it to be queried. Each session maintains multiple connections to the cluster nodes,
62 provides policies to choose which node to use for each query (round-robin on all nodes of the cluster by default), and handles retries for
63 failed query (when it makes sense), etc...
64 Session instances are thread-safe and usually a single instance is enough per application. However, a given session can only be set to one
65 keyspace at a time, so one instance per keyspace is necessary.
67 private Session session;
68 private final String keyspace;
69 // If this is null, then we own session
70 private final AbsCassDAO<TRANS,?> owningDAO;
71 protected Class<DATA> dataClass;
72 private final String name;
73 // private static Slot sessionSlot; // not used since 2015
74 private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<>();
75 private static final List<Object> EMPTY = new ArrayList<>(0);
76 private static final Deque<ResetRequest> resetDeque = new ConcurrentLinkedDeque<>();
77 private static boolean resetTrigger = false;
78 private static long nextAvailableReset = 0;
80 public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass) {
82 this.cluster = cluster;
83 this.keyspace = keyspace;
84 owningDAO = null; // we own session
86 this.dataClass = dataClass;
89 public AbsCassDAO(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass) {
91 cluster = aDao.cluster;
92 keyspace = aDao.keyspace;
94 // We do not own session
96 this.dataClass = dataClass;
100 //Note: Lower case ON PURPOSE. These names used to create History Messages
102 create,read,update,delete;
105 public class PSInfo {
106 private PreparedStatement ps;
107 private final int size;
108 private final Loader<DATA> loader;
109 private final CRUD crud; // Store CRUD, because it makes a difference in Object Order, see Loader
110 private final String cql;
111 private final ConsistencyLevel consistency;
115 * Create a PSInfo and create Prepared Statement
121 public PSInfo(TRANS trans, String theCQL, Loader<DATA> loader, ConsistencyLevel consistency) {
122 this.loader = loader;
123 this.consistency=consistency;
126 cql = theCQL.trim().toUpperCase();
127 if (cql.startsWith("INSERT")) {
129 } else if (cql.startsWith("UPDATE")) {
131 } else if (cql.startsWith("DELETE")) {
137 int idx = 0, count=0;
138 while ((idx=cql.indexOf('?',idx))>=0) {
145 public synchronized void reset() {
149 private synchronized BoundStatement ps(TransStore trans) throws APIException, IOException {
151 You should prepare only once, and cache the PreparedStatement in your application (it is thread-safe).
152 If you call prepare multiple times with the same query string, the driver will log a warning.
155 TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB);
157 ps = getSession(trans).prepare(cql);
158 ps.setConsistencyLevel(consistency);
159 } catch (DriverException e) {
160 reportPerhapsReset(trans,e);
166 // BoundStatements are NOT threadsafe... need a new one each time.
167 return new BoundStatement(ps);
171 * Execute a Prepared Statement by extracting from DATA object
178 public Result<ResultSetFuture> execAsync(TRANS trans, String text, DATA data) {
179 TimeTaken tt = trans.start(text, Env.REMOTE);
181 return Result.ok(getSession(trans).executeAsync(
182 ps(trans).bind(loader.extract(data, size, crud))));
183 } catch (DriverException | APIException | IOException e) {
184 AbsCassDAO.this.reportPerhapsReset(trans,e);
185 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
192 * Execute a Prepared Statement on Object[] key
199 public Result<ResultSetFuture> execAsync(TRANS trans, String text, Object ... objs) {
200 TimeTaken tt = trans.start(text, Env.REMOTE);
202 return Result.ok(getSession(trans).executeAsync(ps(trans).bind(objs)));
203 } catch (DriverException | APIException | IOException e) {
204 AbsCassDAO.this.reportPerhapsReset(trans,e);
205 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
217 * Execute a Prepared Statement by extracting from DATA object
224 public Result<ResultSet> exec(TRANS trans, String text, DATA data) {
225 TimeTaken tt = trans.start(text, Env.REMOTE);
228 * "execute" (and executeAsync)
229 * Executes the provided query.
230 This method blocks until at least some result has been received from the database. However,
231 for SELECT queries, it does not guarantee that the result has been received in full. But it
232 does guarantee that some response has been received from the database, and in particular
233 guarantee that if the request is invalid, an exception will be thrown by this method.
236 statement - the CQL query to execute (that can be any Statement).
238 the result of the query. That result will never be null but can be empty (and will
239 be for any non SELECT query).
241 return Result.ok(getSession(trans).execute(
242 ps(trans).bind(loader.extract(data, size, crud))));
243 } catch (DriverException | APIException | IOException e) {
244 AbsCassDAO.this.reportPerhapsReset(trans,e);
245 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
252 * Execute a Prepared Statement on Object[] key
259 public Result<ResultSet> exec(TRANS trans, String text, Object ... objs) {
260 TimeTaken tt = trans.start(text, Env.REMOTE);
262 return Result.ok(getSession(trans).execute(ps(trans).bind(objs)));
263 } catch (DriverException | APIException | IOException e) {
264 AbsCassDAO.this.reportPerhapsReset(trans,e);
265 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
272 * Read the Data from Cassandra given a Prepared Statement (defined by the
275 * This is common behavior among all DAOs.
276 * @throws DAOException
278 public Result<List<DATA>> read(TRANS trans, String text, Object[] key) {
279 TimeTaken tt = trans.start(text,Env.REMOTE);
283 rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key));
284 /// TEST CODE for Exception
285 // boolean force = true;
287 // Map<InetSocketAddress, Throwable> misa = new HashMap<>();
288 // //misa.put(new InetSocketAddress(444),new Exception("no host was tried"));
289 // misa.put(new InetSocketAddress(444),new Exception("Connection has been closed"));
290 // throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa);
291 //// throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried");
294 } catch (DriverException | APIException | IOException e) {
295 AbsCassDAO.this.reportPerhapsReset(trans,e);
296 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
301 return extract(loader,rs,null /*let Array be created if necessary*/,dflt);
304 public Result<List<DATA>> read(TRANS trans, String text, DATA data) {
305 return read(trans,text, loader.extract(data, size, crud));
308 public Object[] keyFrom(DATA data) {
309 return loader.extract(data, size, CRUD.delete); // Delete is key only
313 * Note: in case PSInfos are deleted, we want to remove them from list. This is not expected,
314 * but we don't want a data leak if it does. Finalize doesn't have to happen quickly
317 protected void finalize() throws Throwable {
318 psinfos.remove(this);
322 protected final Accept<DATA> dflt = new Accept<DATA>() {
324 public boolean ok(DATA data) {
330 @SuppressWarnings("unchecked")
331 protected final Result<List<DATA>> extract(Loader<DATA> loader, ResultSet rs, List<DATA> indata, Accept<DATA> accept) {
332 List<Row> rows = rs.all();
333 if (rows.isEmpty()) {
334 return Result.ok((List<DATA>)EMPTY); // Result sets now .emptyList(true);
337 List<DATA> data = indata==null?new ArrayList<>(rows.size()):indata;
339 for (Row row : rows) {
341 d = loader.load(dataClass.newInstance(),row);
345 } catch (Exception e) {
346 return Result.err(e);
349 return Result.ok(data);
353 private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created";
354 private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created";
355 private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session";
356 private static final Object LOCK = new Object();
358 private static class ResetRequest {
363 public ResetRequest(Session session) {
364 this.session = session;
365 timestamp = System.currentTimeMillis();
370 public static final void primePSIs(TransStore trans) throws APIException, IOException {
371 for (AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
378 public final Session getSession(TransStore trans) throws APIException, IOException {
379 // SessionFilter unused since 2015
380 // Try to use Trans' session, if exists
381 // if (sessionSlot!=null) { // try to get from Trans
382 // Session sess = trans.get(sessionSlot, null);
388 // If there's an owning DAO, use it's session
389 if (owningDAO!=null) {
390 return owningDAO.getSession(trans);
393 // OK, nothing else works... get our own.
394 if (session==null || resetTrigger) {
395 Cluster tempCluster = null;
396 Session tempSession = null;
399 boolean reset = false;
400 for (ResetRequest r : resetDeque) {
401 if (r.session == session) {
402 if (r.timestamp>nextAvailableReset) {
404 nextAvailableReset = System.currentTimeMillis() + 60000;
405 tempCluster = cluster;
406 tempSession = session;
409 trans.warn().log("Cassandra Connection Reset Ignored: Recent Reset");
414 if (reset || session == null) {
415 TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB);
417 // Note: Maitrayee recommended not closing the cluster, just
418 // overwrite it. Jonathan 9/30/2016 assuming same for Session
419 // This was a bad idea. Ran out of File Handles as I suspected, Jonathan
421 for (AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
425 if (reset || cluster==null) {
426 cluster = CassAccess.cluster(trans, keyspace);
427 trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED);
429 if (reset || session==null) {
430 session = cluster.connect(keyspace);
431 trans.warn().log(NEW_CASSANDRA_SESSION_CREATED);
440 TimeTaken tt = trans.start("Clear Reset Deque", Env.SUB);
443 // Not clearing Session/Cluster appears to kill off FileHandles
444 if (tempSession!=null && !tempSession.isClosed()) {
447 if (tempCluster!=null && !tempCluster.isClosed()) {
458 public final boolean reportPerhapsReset(TransStore trans, Exception e) {
459 if (owningDAO!=null) {
460 return owningDAO.reportPerhapsReset(trans, e);
463 if (CassAccess.isResetException(e)) {
464 trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName());
465 resetDeque.addFirst(new ResetRequest(session));
466 rv = resetTrigger = true;
468 trans.error().log(e);
473 public void close(TransStore trans) {
474 if (owningDAO==null) {
476 TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB);
484 trans.debug().log("close called(), Session already closed");
487 owningDAO.close(trans);
491 protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) {
494 protected interface Accept<DATA> {
495 public boolean ok(DATA data);