1 /*******************************************************************************
\r
2 * ============LICENSE_START====================================================
\r
4 * * ===========================================================================
\r
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * * ===========================================================================
\r
7 * * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * * you may not use this file except in compliance with the License.
\r
9 * * You may obtain a copy of the License at
\r
11 * * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * * Unless required by applicable law or agreed to in writing, software
\r
14 * * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * * See the License for the specific language governing permissions and
\r
17 * * limitations under the License.
\r
18 * * ============LICENSE_END====================================================
\r
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
\r
22 ******************************************************************************/
\r
23 package org.onap.aaf.dao;
\r
25 import java.io.IOException;
\r
26 import java.util.ArrayList;
\r
27 import java.util.Deque;
\r
28 import java.util.List;
\r
29 import java.util.concurrent.ConcurrentLinkedDeque;
\r
31 import org.onap.aaf.authz.layer.Result;
\r
32 import org.onap.aaf.dao.aaf.cass.Status;
\r
34 import org.onap.aaf.inno.env.APIException;
\r
35 import org.onap.aaf.inno.env.Env;
\r
36 import org.onap.aaf.inno.env.Slot;
\r
37 import org.onap.aaf.inno.env.TimeTaken;
\r
38 import org.onap.aaf.inno.env.TransStore;
\r
39 import com.datastax.driver.core.BoundStatement;
\r
40 import com.datastax.driver.core.Cluster;
\r
41 import com.datastax.driver.core.ConsistencyLevel;
\r
42 import com.datastax.driver.core.ResultSet;
\r
43 import com.datastax.driver.core.ResultSetFuture;
\r
44 import com.datastax.driver.core.Row;
\r
45 import com.datastax.driver.core.Session;
\r
46 import com.datastax.driver.core.exceptions.DriverException;
\r
48 public abstract class AbsCassDAO<TRANS extends TransStore,DATA> {
\r
49 protected static final char DOT = '.';
\r
50 protected static final char DOT_PLUS_ONE = '.'+1;
\r
51 protected static final String FIRST_CHAR = Character.toString((char)0);
\r
52 protected static final String LAST_CHAR = Character.toString((char)Character.MAX_VALUE);
\r
53 protected static final int FIELD_COMMAS = 0;
\r
54 protected static final int QUESTION_COMMAS = 1;
\r
55 protected static final int ASSIGNMENT_COMMAS = 2;
\r
56 protected static final int WHERE_ANDS = 3;
\r
58 private Cluster cluster;
\r
59 private Session session;
\r
60 private final String keyspace;
\r
61 // If this is null, then we own session
\r
62 private final AbsCassDAO<TRANS,?> owningDAO;
\r
63 protected Class<DATA> dataClass;
\r
64 private final String name;
\r
65 private static Slot sessionSlot;
\r
66 //private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<AbsCassDAO<TransStore,?>.PSInfo>();
\r
67 private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo>();
\r
68 private static final List<Object> EMPTY = new ArrayList<Object>(0);
\r
69 private static final Deque<ResetRequest> resetDeque = new ConcurrentLinkedDeque<ResetRequest>();
\r
70 private static boolean resetTrigger = false;
\r
71 private static long nextAvailableReset = 0;
\r
74 public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass) {
\r
76 this.cluster = cluster;
\r
77 this.keyspace = keyspace;
\r
78 owningDAO = null; // we own session
\r
80 this.dataClass = dataClass;
\r
84 public AbsCassDAO(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass) {
\r
86 cluster = aDao.cluster;
\r
87 keyspace = aDao.keyspace;
\r
89 owningDAO = aDao; // We do not own session
\r
90 this.dataClass = dataClass;
\r
93 public static void setSessionSlot(Slot slot) {
\r
97 //Note: Lower case ON PURPOSE. These names used to create History Messages
\r
99 create,read,update,delete
\r
104 public class PSInfo {
\r
105 private BoundStatement ps;
\r
106 private final int size;
\r
107 private final Loader<DATA> loader;
\r
108 private final CRUD crud; // Store CRUD, because it makes a difference in Object Order, see Loader
\r
109 private final String cql;
\r
110 private final ConsistencyLevel consistency;
\r
114 * Create a PSInfo and create Prepared Statement
\r
120 public PSInfo(TRANS trans, String theCQL, Loader<DATA> loader, ConsistencyLevel consistency) {
\r
121 this.loader = loader;
\r
122 this.consistency=consistency;
\r
125 cql = theCQL.trim().toUpperCase();
\r
126 if(cql.startsWith("INSERT")) {
\r
127 crud = CRUD.create;
\r
128 } else if(cql.startsWith("UPDATE")) {
\r
129 crud = CRUD.update;
\r
130 } else if(cql.startsWith("DELETE")) {
\r
131 crud = CRUD.delete;
\r
136 int idx = 0, count=0;
\r
137 while((idx=cql.indexOf('?',idx))>=0) {
\r
144 public synchronized void reset() {
\r
148 private BoundStatement ps(TransStore trans) throws APIException, IOException {
\r
150 synchronized(this) {
\r
152 TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB);
\r
154 ps = new BoundStatement(getSession(trans).prepare(cql));
\r
155 ps.setConsistencyLevel(consistency);
\r
156 } catch (DriverException e) {
\r
157 reportPerhapsReset(trans,e);
\r
169 * Execute a Prepared Statement by extracting from DATA object
\r
176 public Result<ResultSetFuture> execAsync(TRANS trans, String text, DATA data) {
\r
177 TimeTaken tt = trans.start(text, Env.REMOTE);
\r
179 return Result.ok(getSession(trans).executeAsync(
\r
180 ps(trans).bind(loader.extract(data, size, crud))));
\r
181 } catch (DriverException | APIException | IOException e) {
\r
182 AbsCassDAO.this.reportPerhapsReset(trans,e);
\r
183 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
\r
190 * Execute a Prepared Statement on Object[] key
\r
197 public Result<ResultSetFuture> execAsync(TRANS trans, String text, Object ... objs) {
\r
198 TimeTaken tt = trans.start(text, Env.REMOTE);
\r
200 return Result.ok(getSession(trans).executeAsync(ps(trans).bind(objs)));
\r
201 } catch (DriverException | APIException | IOException e) {
\r
202 AbsCassDAO.this.reportPerhapsReset(trans,e);
\r
203 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
\r
215 * Execute a Prepared Statement by extracting from DATA object
\r
222 public Result<ResultSet> exec(TRANS trans, String text, DATA data) {
\r
223 TimeTaken tt = trans.start(text, Env.REMOTE);
\r
226 * "execute" (and executeAsync)
\r
227 * Executes the provided query.
\r
228 This method blocks until at least some result has been received from the database. However,
\r
229 for SELECT queries, it does not guarantee that the result has been received in full. But it
\r
230 does guarantee that some response has been received from the database, and in particular
\r
231 guarantee that if the request is invalid, an exception will be thrown by this method.
\r
234 statement - the CQL query to execute (that can be any Statement).
\r
236 the result of the query. That result will never be null but can be empty (and will
\r
237 be for any non SELECT query).
\r
239 return Result.ok(getSession(trans).execute(
\r
240 ps(trans).bind(loader.extract(data, size, crud))));
\r
241 } catch (DriverException | APIException | IOException e) {
\r
242 AbsCassDAO.this.reportPerhapsReset(trans,e);
\r
243 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
\r
250 * Execute a Prepared Statement on Object[] key
\r
257 public Result<ResultSet> exec(TRANS trans, String text, Object ... objs) {
\r
258 TimeTaken tt = trans.start(text, Env.REMOTE);
\r
260 return Result.ok(getSession(trans).execute(ps(trans).bind(objs)));
\r
261 } catch (DriverException | APIException | IOException e) {
\r
262 AbsCassDAO.this.reportPerhapsReset(trans,e);
\r
263 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
\r
270 * Read the Data from Cassandra given a Prepared Statement (defined by the
\r
273 * This is common behavior among all DAOs.
\r
274 * @throws DAOException
\r
276 public Result<List<DATA>> read(TRANS trans, String text, Object[] key) {
\r
277 TimeTaken tt = trans.start(text,Env.REMOTE);
\r
281 rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key));
\r
282 /// TEST CODE for Exception
\r
283 // boolean force = true;
\r
285 // Map<InetSocketAddress, Throwable> misa = new HashMap<InetSocketAddress,Throwable>();
\r
286 // //misa.put(new InetSocketAddress(444),new Exception("no host was tried"));
\r
287 // misa.put(new InetSocketAddress(444),new Exception("Connection has been closed"));
\r
288 // throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa);
\r
289 //// throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried");
\r
292 } catch (DriverException | APIException | IOException e) {
\r
293 AbsCassDAO.this.reportPerhapsReset(trans,e);
\r
294 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
\r
299 return extract(loader,rs,null /*let Array be created if necessary*/,dflt);
\r
302 public Result<List<DATA>> read(TRANS trans, String text, DATA data) {
\r
303 return read(trans,text, loader.extract(data, size, crud));
\r
306 public Object[] keyFrom(DATA data) {
\r
307 return loader.extract(data, size, CRUD.delete); // Delete is key only
\r
311 * Note: in case PSInfos are deleted, we want to remove them from list. This is not expected,
\r
312 * but we don't want a data leak if it does. Finalize doesn't have to happen quickly
\r
315 protected void finalize() throws Throwable {
\r
316 psinfos.remove(this);
\r
320 protected final Accept<DATA> dflt = new Accept<DATA>() {
\r
322 public boolean ok(DATA data) {
\r
328 @SuppressWarnings("unchecked")
\r
329 protected final Result<List<DATA>> extract(Loader<DATA> loader, ResultSet rs, List<DATA> indata, Accept<DATA> accept) {
\r
330 List<Row> rows = rs.all();
\r
331 if(rows.isEmpty()) {
\r
332 return Result.ok((List<DATA>)EMPTY); // Result sets now .emptyList(true);
\r
335 List<DATA> data = indata==null?new ArrayList<DATA>(rows.size()):indata;
\r
337 for(Row row : rows) {
\r
339 d = loader.load(dataClass.newInstance(),row);
\r
343 } catch(Exception e) {
\r
344 return Result.err(e);
\r
347 return Result.ok(data);
\r
351 private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created";
\r
352 private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created";
\r
353 private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session";
\r
355 private static class ResetRequest {
\r
356 //package on purpose
\r
360 public ResetRequest(Session session) {
\r
361 this.session = session;
\r
362 timestamp = System.currentTimeMillis();
\r
367 public static final void primePSIs(TransStore trans) throws APIException, IOException {
\r
368 for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
\r
375 public final Session getSession(TransStore trans) throws APIException, IOException {
\r
376 // Try to use Trans' session, if exists
\r
377 if(sessionSlot!=null) { // try to get from Trans
\r
378 Session sess = trans.get(sessionSlot, null);
\r
384 // If there's an owning DAO, use it's session
\r
385 if(owningDAO!=null) {
\r
386 return owningDAO.getSession(trans);
\r
389 // OK, nothing else works... get our own.
\r
390 if(session==null || resetTrigger) {
\r
391 Cluster tempCluster = null;
\r
392 Session tempSession = null;
\r
394 synchronized(NEW_CASSANDRA_SESSION_CREATED) {
\r
395 boolean reset = false;
\r
396 for(ResetRequest r : resetDeque) {
\r
397 if(r.session == session) {
\r
398 if(r.timestamp>nextAvailableReset) {
\r
400 nextAvailableReset = System.currentTimeMillis() + 60000;
\r
401 tempCluster = cluster;
\r
402 tempSession = session;
\r
405 trans.warn().log("Cassandra Connection Reset Ignored: Recent Reset");
\r
410 if(reset || session == null) {
\r
411 TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB);
\r
413 // Note: Maitrayee recommended not closing the cluster, just
\r
414 // overwrite it. 9/30/2016 assuming same for Session
\r
415 // This was a bad idea. Ran out of File Handles as I suspected..
\r
417 for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
\r
421 if(reset || cluster==null) {
\r
422 cluster = CassAccess.cluster(trans, keyspace);
\r
423 trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED);
\r
425 if(reset || session==null) {
\r
426 session = cluster.connect(keyspace);
\r
427 trans.warn().log(NEW_CASSANDRA_SESSION_CREATED);
\r
430 resetTrigger=false;
\r
436 TimeTaken tt = trans.start("Clear Reset Deque", Env.SUB);
\r
438 resetDeque.clear();
\r
439 // Not clearing Session/Cluster appears to kill off FileHandles
\r
440 if(tempSession!=null && !tempSession.isClosed()) {
\r
441 tempSession.close();
\r
443 if(tempCluster!=null && !tempCluster.isClosed()) {
\r
444 tempCluster.close();
\r
454 public final boolean reportPerhapsReset(TransStore trans, Exception e) {
\r
455 if(owningDAO!=null) {
\r
456 return owningDAO.reportPerhapsReset(trans, e);
\r
458 boolean rv = false;
\r
459 if(CassAccess.isResetException(e)) {
\r
460 trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName());
\r
461 resetDeque.addFirst(new ResetRequest(session));
\r
462 rv = resetTrigger = true;
\r
464 trans.error().log(e);
\r
469 public void close(TransStore trans) {
\r
470 if(owningDAO==null) {
\r
471 if(session!=null) {
\r
472 TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB);
\r
480 trans.debug().log("close called(), Session already closed");
\r
483 owningDAO.close(trans);
\r
487 protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) {
\r
490 protected interface Accept<DATA> {
\r
491 public boolean ok(DATA data);
\r