1 /*******************************************************************************
\r
2 * ============LICENSE_START====================================================
\r
4 * * ===========================================================================
\r
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * * ===========================================================================
\r
7 * * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * * you may not use this file except in compliance with the License.
\r
9 * * You may obtain a copy of the License at
\r
11 * * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * * Unless required by applicable law or agreed to in writing, software
\r
14 * * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * * See the License for the specific language governing permissions and
\r
17 * * limitations under the License.
\r
18 * * ============LICENSE_END====================================================
\r
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
\r
22 ******************************************************************************/
\r
23 package com.att.dao;
\r
25 import java.io.IOException;
\r
26 import java.util.ArrayList;
\r
27 import java.util.Deque;
\r
28 import java.util.List;
\r
29 import java.util.concurrent.ConcurrentLinkedDeque;
\r
31 import com.att.authz.layer.Result;
\r
32 import com.att.dao.aaf.cass.Status;
\r
33 import com.att.inno.env.APIException;
\r
34 import com.att.inno.env.Env;
\r
35 import com.att.inno.env.Slot;
\r
36 import com.att.inno.env.TimeTaken;
\r
37 import com.att.inno.env.TransStore;
\r
38 import com.datastax.driver.core.BoundStatement;
\r
39 import com.datastax.driver.core.Cluster;
\r
40 import com.datastax.driver.core.ConsistencyLevel;
\r
41 import com.datastax.driver.core.ResultSet;
\r
42 import com.datastax.driver.core.ResultSetFuture;
\r
43 import com.datastax.driver.core.Row;
\r
44 import com.datastax.driver.core.Session;
\r
45 import com.datastax.driver.core.exceptions.DriverException;
\r
47 public abstract class AbsCassDAO<TRANS extends TransStore,DATA> {
\r
48 protected static final char DOT = '.';
\r
49 protected static final char DOT_PLUS_ONE = '.'+1;
\r
50 protected static final String FIRST_CHAR = Character.toString((char)0);
\r
51 protected static final String LAST_CHAR = Character.toString((char)Character.MAX_VALUE);
\r
52 protected static final int FIELD_COMMAS = 0;
\r
53 protected static final int QUESTION_COMMAS = 1;
\r
54 protected static final int ASSIGNMENT_COMMAS = 2;
\r
55 protected static final int WHERE_ANDS = 3;
\r
57 private Cluster cluster;
\r
58 private Session session;
\r
59 private final String keyspace;
\r
60 // If this is null, then we own session
\r
61 private final AbsCassDAO<TRANS,?> owningDAO;
\r
62 protected Class<DATA> dataClass;
\r
63 private final String name;
\r
64 private static Slot sessionSlot;
\r
65 //private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<AbsCassDAO<TransStore,?>.PSInfo>();
\r
66 private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo>();
\r
67 private static final List<Object> EMPTY = new ArrayList<Object>(0);
\r
68 private static final Deque<ResetRequest> resetDeque = new ConcurrentLinkedDeque<ResetRequest>();
\r
69 private static boolean resetTrigger = false;
\r
70 private static long nextAvailableReset = 0;
\r
73 public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass) {
\r
75 this.cluster = cluster;
\r
76 this.keyspace = keyspace;
\r
77 owningDAO = null; // we own session
\r
79 this.dataClass = dataClass;
\r
83 public AbsCassDAO(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass) {
\r
85 cluster = aDao.cluster;
\r
86 keyspace = aDao.keyspace;
\r
88 owningDAO = aDao; // We do not own session
\r
89 this.dataClass = dataClass;
\r
92 public static void setSessionSlot(Slot slot) {
\r
96 //Note: Lower case ON PURPOSE. These names used to create History Messages
\r
98 create,read,update,delete
\r
103 public class PSInfo {
\r
104 private BoundStatement ps;
\r
105 private final int size;
\r
106 private final Loader<DATA> loader;
\r
107 private final CRUD crud; // Store CRUD, because it makes a difference in Object Order, see Loader
\r
108 private final String cql;
\r
109 private final ConsistencyLevel consistency;
\r
113 * Create a PSInfo and create Prepared Statement
\r
119 public PSInfo(TRANS trans, String theCQL, Loader<DATA> loader, ConsistencyLevel consistency) {
\r
120 this.loader = loader;
\r
121 this.consistency=consistency;
\r
124 cql = theCQL.trim().toUpperCase();
\r
125 if(cql.startsWith("INSERT")) {
\r
126 crud = CRUD.create;
\r
127 } else if(cql.startsWith("UPDATE")) {
\r
128 crud = CRUD.update;
\r
129 } else if(cql.startsWith("DELETE")) {
\r
130 crud = CRUD.delete;
\r
135 int idx = 0, count=0;
\r
136 while((idx=cql.indexOf('?',idx))>=0) {
\r
143 public synchronized void reset() {
\r
147 private BoundStatement ps(TransStore trans) throws APIException, IOException {
\r
149 synchronized(this) {
\r
151 TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB);
\r
153 ps = new BoundStatement(getSession(trans).prepare(cql));
\r
154 ps.setConsistencyLevel(consistency);
\r
155 } catch (DriverException e) {
\r
156 reportPerhapsReset(trans,e);
\r
168 * Execute a Prepared Statement by extracting from DATA object
\r
175 public Result<ResultSetFuture> execAsync(TRANS trans, String text, DATA data) {
\r
176 TimeTaken tt = trans.start(text, Env.REMOTE);
\r
178 return Result.ok(getSession(trans).executeAsync(
\r
179 ps(trans).bind(loader.extract(data, size, crud))));
\r
180 } catch (DriverException | APIException | IOException e) {
\r
181 AbsCassDAO.this.reportPerhapsReset(trans,e);
\r
182 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
\r
189 * Execute a Prepared Statement on Object[] key
\r
196 public Result<ResultSetFuture> execAsync(TRANS trans, String text, Object ... objs) {
\r
197 TimeTaken tt = trans.start(text, Env.REMOTE);
\r
199 return Result.ok(getSession(trans).executeAsync(ps(trans).bind(objs)));
\r
200 } catch (DriverException | APIException | IOException e) {
\r
201 AbsCassDAO.this.reportPerhapsReset(trans,e);
\r
202 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
\r
214 * Execute a Prepared Statement by extracting from DATA object
\r
221 public Result<ResultSet> exec(TRANS trans, String text, DATA data) {
\r
222 TimeTaken tt = trans.start(text, Env.REMOTE);
\r
225 * "execute" (and executeAsync)
\r
226 * Executes the provided query.
\r
227 This method blocks until at least some result has been received from the database. However,
\r
228 for SELECT queries, it does not guarantee that the result has been received in full. But it
\r
229 does guarantee that some response has been received from the database, and in particular
\r
230 guarantee that if the request is invalid, an exception will be thrown by this method.
\r
233 statement - the CQL query to execute (that can be any Statement).
\r
235 the result of the query. That result will never be null but can be empty (and will
\r
236 be for any non SELECT query).
\r
238 return Result.ok(getSession(trans).execute(
\r
239 ps(trans).bind(loader.extract(data, size, crud))));
\r
240 } catch (DriverException | APIException | IOException e) {
\r
241 AbsCassDAO.this.reportPerhapsReset(trans,e);
\r
242 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
\r
249 * Execute a Prepared Statement on Object[] key
\r
256 public Result<ResultSet> exec(TRANS trans, String text, Object ... objs) {
\r
257 TimeTaken tt = trans.start(text, Env.REMOTE);
\r
259 return Result.ok(getSession(trans).execute(ps(trans).bind(objs)));
\r
260 } catch (DriverException | APIException | IOException e) {
\r
261 AbsCassDAO.this.reportPerhapsReset(trans,e);
\r
262 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
\r
269 * Read the Data from Cassandra given a Prepared Statement (defined by the
\r
272 * This is common behavior among all DAOs.
\r
273 * @throws DAOException
\r
275 public Result<List<DATA>> read(TRANS trans, String text, Object[] key) {
\r
276 TimeTaken tt = trans.start(text,Env.REMOTE);
\r
280 rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key));
\r
281 /// TEST CODE for Exception
\r
282 // boolean force = true;
\r
284 // Map<InetSocketAddress, Throwable> misa = new HashMap<InetSocketAddress,Throwable>();
\r
285 // //misa.put(new InetSocketAddress(444),new Exception("no host was tried"));
\r
286 // misa.put(new InetSocketAddress(444),new Exception("Connection has been closed"));
\r
287 // throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa);
\r
288 //// throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried");
\r
291 } catch (DriverException | APIException | IOException e) {
\r
292 AbsCassDAO.this.reportPerhapsReset(trans,e);
\r
293 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
\r
298 return extract(loader,rs,null /*let Array be created if necessary*/,dflt);
\r
301 public Result<List<DATA>> read(TRANS trans, String text, DATA data) {
\r
302 return read(trans,text, loader.extract(data, size, crud));
\r
305 public Object[] keyFrom(DATA data) {
\r
306 return loader.extract(data, size, CRUD.delete); // Delete is key only
\r
310 * Note: in case PSInfos are deleted, we want to remove them from list. This is not expected,
\r
311 * but we don't want a data leak if it does. Finalize doesn't have to happen quickly
\r
314 protected void finalize() throws Throwable {
\r
315 psinfos.remove(this);
\r
319 protected final Accept<DATA> dflt = new Accept<DATA>() {
\r
321 public boolean ok(DATA data) {
\r
327 @SuppressWarnings("unchecked")
\r
328 protected final Result<List<DATA>> extract(Loader<DATA> loader, ResultSet rs, List<DATA> indata, Accept<DATA> accept) {
\r
329 List<Row> rows = rs.all();
\r
330 if(rows.isEmpty()) {
\r
331 return Result.ok((List<DATA>)EMPTY); // Result sets now .emptyList(true);
\r
334 List<DATA> data = indata==null?new ArrayList<DATA>(rows.size()):indata;
\r
336 for(Row row : rows) {
\r
338 d = loader.load(dataClass.newInstance(),row);
\r
342 } catch(Exception e) {
\r
343 return Result.err(e);
\r
346 return Result.ok(data);
\r
350 private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created";
\r
351 private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created";
\r
352 private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session";
\r
354 private static class ResetRequest {
\r
355 //package on purpose
\r
359 public ResetRequest(Session session) {
\r
360 this.session = session;
\r
361 timestamp = System.currentTimeMillis();
\r
366 public static final void primePSIs(TransStore trans) throws APIException, IOException {
\r
367 for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
\r
374 public final Session getSession(TransStore trans) throws APIException, IOException {
\r
375 // Try to use Trans' session, if exists
\r
376 if(sessionSlot!=null) { // try to get from Trans
\r
377 Session sess = trans.get(sessionSlot, null);
\r
383 // If there's an owning DAO, use it's session
\r
384 if(owningDAO!=null) {
\r
385 return owningDAO.getSession(trans);
\r
388 // OK, nothing else works... get our own.
\r
389 if(session==null || resetTrigger) {
\r
390 Cluster tempCluster = null;
\r
391 Session tempSession = null;
\r
393 synchronized(NEW_CASSANDRA_SESSION_CREATED) {
\r
394 boolean reset = false;
\r
395 for(ResetRequest r : resetDeque) {
\r
396 if(r.session == session) {
\r
397 if(r.timestamp>nextAvailableReset) {
\r
399 nextAvailableReset = System.currentTimeMillis() + 60000;
\r
400 tempCluster = cluster;
\r
401 tempSession = session;
\r
404 trans.warn().log("Cassandra Connection Reset Ignored: Recent Reset");
\r
409 if(reset || session == null) {
\r
410 TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB);
\r
412 // Note: Maitrayee recommended not closing the cluster, just
\r
413 // overwrite it. 9/30/2016 assuming same for Session
\r
414 // This was a bad idea. Ran out of File Handles as I suspected..
\r
416 for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
\r
420 if(reset || cluster==null) {
\r
421 cluster = CassAccess.cluster(trans, keyspace);
\r
422 trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED);
\r
424 if(reset || session==null) {
\r
425 session = cluster.connect(keyspace);
\r
426 trans.warn().log(NEW_CASSANDRA_SESSION_CREATED);
\r
429 resetTrigger=false;
\r
435 TimeTaken tt = trans.start("Clear Reset Deque", Env.SUB);
\r
437 resetDeque.clear();
\r
438 // Not clearing Session/Cluster appears to kill off FileHandles
\r
439 if(tempSession!=null && !tempSession.isClosed()) {
\r
440 tempSession.close();
\r
442 if(tempCluster!=null && !tempCluster.isClosed()) {
\r
443 tempCluster.close();
\r
453 public final boolean reportPerhapsReset(TransStore trans, Exception e) {
\r
454 if(owningDAO!=null) {
\r
455 return owningDAO.reportPerhapsReset(trans, e);
\r
457 boolean rv = false;
\r
458 if(CassAccess.isResetException(e)) {
\r
459 trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName());
\r
460 resetDeque.addFirst(new ResetRequest(session));
\r
461 rv = resetTrigger = true;
\r
463 trans.error().log(e);
\r
468 public void close(TransStore trans) {
\r
469 if(owningDAO==null) {
\r
470 if(session!=null) {
\r
471 TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB);
\r
479 trans.debug().log("close called(), Session already closed");
\r
482 owningDAO.close(trans);
\r
486 protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) {
\r
489 protected interface Accept<DATA> {
\r
490 public boolean ok(DATA data);
\r