X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=auth%2Fauth-cass%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Faaf%2Fauth%2Fdao%2FAbsCassDAO.java;h=c68768e13fcbd8450f74f6b9d58ac148652d5fc1;hb=f5fdc4f2d1f87001364ccf462c1398a10e84fdcf;hp=f24c7c9faf171d649d7b67a4d118e530087a165c;hpb=4b5a7d721d994a49057e9bfb403c7bff1b376660;p=aaf%2Fauthz.git diff --git a/auth/auth-cass/src/main/java/org/onap/aaf/auth/dao/AbsCassDAO.java b/auth/auth-cass/src/main/java/org/onap/aaf/auth/dao/AbsCassDAO.java index f24c7c9f..c68768e1 100644 --- a/auth/auth-cass/src/main/java/org/onap/aaf/auth/dao/AbsCassDAO.java +++ b/auth/auth-cass/src/main/java/org/onap/aaf/auth/dao/AbsCassDAO.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -54,14 +54,14 @@ public abstract class AbsCassDAO { protected static final int ASSIGNMENT_COMMAS = 2; protected static final int WHERE_ANDS = 3; - private Cluster cluster; + private Cluster cluster; /* * From DataStax * com.datastax.driver.core.Session - A session holds connections to a Cassandra cluster, allowing it to be queried. Each session maintains multiple connections to the cluster nodes, - provides policies to choose which node to use for each query (round-robin on all nodes of the cluster by default), and handles retries for + A session holds connections to a Cassandra cluster, allowing it to be queried. Each session maintains multiple connections to the cluster nodes, + provides policies to choose which node to use for each query (round-robin on all nodes of the cluster by default), and handles retries for failed query (when it makes sense), etc... - Session instances are thread-safe and usually a single instance is enough per application. However, a given session can only be set to one + Session instances are thread-safe and usually a single instance is enough per application. However, a given session can only be set to one keyspace at a time, so one instance per keyspace is necessary. */ private Session session; @@ -73,10 +73,10 @@ public abstract class AbsCassDAO { // private static Slot sessionSlot; // not used since 2015 private static final ArrayList.PSInfo> psinfos = new ArrayList<>(); private static final List EMPTY = new ArrayList<>(0); - private static final Deque resetDeque = new ConcurrentLinkedDeque(); + private static final Deque resetDeque = new ConcurrentLinkedDeque<>(); private static boolean resetTrigger = false; private static long nextAvailableReset = 0; - + public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class dataClass) { this.name = name; this.cluster = cluster; @@ -95,11 +95,7 @@ public abstract class AbsCassDAO { owningDAO = aDao; this.dataClass = dataClass; } - -// Not used since 2015 -// public static void setSessionSlot(Slot slot) { -// sessionSlot = slot; -// } + //Note: Lower case ON PURPOSE. These names used to create History Messages public enum CRUD { @@ -117,7 +113,7 @@ public abstract class AbsCassDAO { /** * Create a PSInfo and create Prepared Statement - * + * * @param trans * @param theCQL * @param loader @@ -128,34 +124,34 @@ public abstract class AbsCassDAO { psinfos.add(this); cql = theCQL.trim().toUpperCase(); - if(cql.startsWith("INSERT")) { + if (cql.startsWith("INSERT")) { crud = CRUD.create; - } else if(cql.startsWith("UPDATE")) { + } else if (cql.startsWith("UPDATE")) { crud = CRUD.update; - } else if(cql.startsWith("DELETE")) { + } else if (cql.startsWith("DELETE")) { crud = CRUD.delete; } else { crud = CRUD.read; } - + int idx = 0, count=0; - while((idx=cql.indexOf('?',idx))>=0) { + while ((idx=cql.indexOf('?',idx))>=0) { ++idx; ++count; } size=count; } - + public synchronized void reset() { ps = null; } - + private synchronized BoundStatement ps(TransStore trans) throws APIException, IOException { /* From Datastax - You should prepare only once, and cache the PreparedStatement in your application (it is thread-safe). + You should prepare only once, and cache the PreparedStatement in your application (it is thread-safe). If you call prepare multiple times with the same query string, the driver will log a warning. */ - if(ps==null) { + if (ps==null) { TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB); try { ps = getSession(trans).prepare(cql); @@ -173,7 +169,7 @@ public abstract class AbsCassDAO { /** * Execute a Prepared Statement by extracting from DATA object - * + * * @param trans * @param text * @param data @@ -194,7 +190,7 @@ public abstract class AbsCassDAO { /** * Execute a Prepared Statement on Object[] key - * + * * @param trans * @param text * @param objs @@ -211,15 +207,15 @@ public abstract class AbsCassDAO { tt.done(); } } - - /* + + /* * Note: - * + * */ /** * Execute a Prepared Statement by extracting from DATA object - * + * * @param trans * @param text * @param data @@ -231,15 +227,15 @@ public abstract class AbsCassDAO { /* * "execute" (and executeAsync) * Executes the provided query. - This method blocks until at least some result has been received from the database. However, - for SELECT queries, it does not guarantee that the result has been received in full. But it - does guarantee that some response has been received from the database, and in particular + This method blocks until at least some result has been received from the database. However, + for SELECT queries, it does not guarantee that the result has been received in full. But it + does guarantee that some response has been received from the database, and in particular guarantee that if the request is invalid, an exception will be thrown by this method. Parameters: statement - the CQL query to execute (that can be any Statement). Returns: - the result of the query. That result will never be null but can be empty (and will + the result of the query. That result will never be null but can be empty (and will be for any non SELECT query). */ return Result.ok(getSession(trans).execute( @@ -254,7 +250,7 @@ public abstract class AbsCassDAO { /** * Execute a Prepared Statement on Object[] key - * + * * @param trans * @param text * @param objs @@ -281,13 +277,13 @@ public abstract class AbsCassDAO { */ public Result> read(TRANS trans, String text, Object[] key) { TimeTaken tt = trans.start(text,Env.REMOTE); - + ResultSet rs; try { rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key)); -/// TEST CODE for Exception -// boolean force = true; -// if(force) { +/// TEST CODE for Exception +// boolean force = true; +// if (force) { // Map misa = new HashMap<>(); // //misa.put(new InetSocketAddress(444),new Exception("no host was tried")); // misa.put(new InetSocketAddress(444),new Exception("Connection has been closed")); @@ -301,20 +297,20 @@ public abstract class AbsCassDAO { } finally { tt.done(); } - + return extract(loader,rs,null /*let Array be created if necessary*/,dflt); } - + public Result> read(TRANS trans, String text, DATA data) { return read(trans,text, loader.extract(data, size, crud)); } - + public Object[] keyFrom(DATA data) { return loader.extract(data, size, CRUD.delete); // Delete is key only } /* - * Note: in case PSInfos are deleted, we want to remove them from list. This is not expected, + * Note: in case PSInfos are deleted, we want to remove them from list. This is not expected, * but we don't want a data leak if it does. Finalize doesn't have to happen quickly */ @Override @@ -334,26 +330,26 @@ public abstract class AbsCassDAO { @SuppressWarnings("unchecked") protected final Result> extract(Loader loader, ResultSet rs, List indata, Accept accept) { List rows = rs.all(); - if(rows.isEmpty()) { + if (rows.isEmpty()) { return Result.ok((List)EMPTY); // Result sets now .emptyList(true); } else { DATA d; List data = indata==null?new ArrayList<>(rows.size()):indata; - - for(Row row : rows) { + + for (Row row : rows) { try { d = loader.load(dataClass.newInstance(),row); - if(accept.ok(d)) { + if (accept.ok(d)) { data.add(d); } - } catch(Exception e) { + } catch (Exception e) { return Result.err(e); } } return Result.ok(data); } } - + private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created"; private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created"; private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session"; @@ -363,47 +359,47 @@ public abstract class AbsCassDAO { //package on purpose Session session; long timestamp; - + public ResetRequest(Session session) { this.session = session; timestamp = System.currentTimeMillis(); } } - + public static final void primePSIs(TransStore trans) throws APIException, IOException { - for(AbsCassDAO.PSInfo psi : psinfos) { - if(psi.ps==null) { + for (AbsCassDAO.PSInfo psi : psinfos) { + if (psi.ps==null) { psi.ps(trans); } } } - + public final Session getSession(TransStore trans) throws APIException, IOException { // SessionFilter unused since 2015 // Try to use Trans' session, if exists -// if(sessionSlot!=null) { // try to get from Trans +// if (sessionSlot!=null) { // try to get from Trans // Session sess = trans.get(sessionSlot, null); -// if(sess!=null) { +// if (sess!=null) { // return sess; // } // } - + // If there's an owning DAO, use it's session - if(owningDAO!=null) { + if (owningDAO!=null) { return owningDAO.getSession(trans); } - + // OK, nothing else works... get our own. - if(session==null || resetTrigger) { + if (session==null || resetTrigger) { Cluster tempCluster = null; Session tempSession = null; try { synchronized(LOCK) { boolean reset = false; - for(ResetRequest r : resetDeque) { - if(r.session == session) { - if(r.timestamp>nextAvailableReset) { + for (ResetRequest r : resetDeque) { + if (r.session == session) { + if (r.timestamp>nextAvailableReset) { reset=true; nextAvailableReset = System.currentTimeMillis() + 60000; tempCluster = cluster; @@ -414,23 +410,23 @@ public abstract class AbsCassDAO { } } } - - if(reset || session == null) { + + if (reset || session == null) { TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB); try { // Note: Maitrayee recommended not closing the cluster, just // overwrite it. Jonathan 9/30/2016 assuming same for Session // This was a bad idea. Ran out of File Handles as I suspected, Jonathan - if(reset) { - for(AbsCassDAO.PSInfo psi : psinfos) { + if (reset) { + for (AbsCassDAO.PSInfo psi : psinfos) { psi.reset(); } } - if(reset || cluster==null) { + if (reset || cluster==null) { cluster = CassAccess.cluster(trans, keyspace); trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED); } - if(reset || session==null) { + if (reset || session==null) { session = cluster.connect(keyspace); trans.warn().log(NEW_CASSANDRA_SESSION_CREATED); } @@ -445,10 +441,10 @@ public abstract class AbsCassDAO { try { resetDeque.clear(); // Not clearing Session/Cluster appears to kill off FileHandles - if(tempSession!=null && !tempSession.isClosed()) { + if (tempSession!=null && !tempSession.isClosed()) { tempSession.close(); } - if(tempCluster!=null && !tempCluster.isClosed()) { + if (tempCluster!=null && !tempCluster.isClosed()) { tempCluster.close(); } } finally { @@ -458,25 +454,25 @@ public abstract class AbsCassDAO { } return session; } - + public final boolean reportPerhapsReset(TransStore trans, Exception e) { - if(owningDAO!=null) { + if (owningDAO!=null) { return owningDAO.reportPerhapsReset(trans, e); } else { boolean rv = false; - if(CassAccess.isResetException(e)) { + if (CassAccess.isResetException(e)) { trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName()); resetDeque.addFirst(new ResetRequest(session)); rv = resetTrigger = true; - } + } trans.error().log(e); return rv; } } public void close(TransStore trans) { - if(owningDAO==null) { - if(session!=null) { + if (owningDAO==null) { + if (session!=null) { TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB); try { session.close(); @@ -494,7 +490,7 @@ public abstract class AbsCassDAO { protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) { } - + protected interface Accept { public boolean ok(DATA data); }