AT&T 2.0.19 Code drop, stage 5
[aaf/authz.git] / authz-cass / src / main / java / org / onap / aaf / dao / AbsCassDAO.java
diff --git a/authz-cass/src/main/java/org/onap/aaf/dao/AbsCassDAO.java b/authz-cass/src/main/java/org/onap/aaf/dao/AbsCassDAO.java
deleted file mode 100644 (file)
index c76a88f..0000000
+++ /dev/null
@@ -1,497 +0,0 @@
-/*******************************************************************************\r
- * ============LICENSE_START====================================================\r
- * * org.onap.aaf\r
- * * ===========================================================================\r
- * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- * * ===========================================================================\r
- * * Licensed under the Apache License, Version 2.0 (the "License");\r
- * * you may not use this file except in compliance with the License.\r
- * * You may obtain a copy of the License at\r
- * * \r
- *  *      http://www.apache.org/licenses/LICENSE-2.0\r
- * * \r
- *  * Unless required by applicable law or agreed to in writing, software\r
- * * distributed under the License is distributed on an "AS IS" BASIS,\r
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * * See the License for the specific language governing permissions and\r
- * * limitations under the License.\r
- * * ============LICENSE_END====================================================\r
- * *\r
- * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
- * *\r
- ******************************************************************************/\r
-package org.onap.aaf.dao;\r
-\r
-import java.io.IOException;\r
-import java.util.ArrayList;\r
-import java.util.Deque;\r
-import java.util.List;\r
-import java.util.concurrent.ConcurrentLinkedDeque;\r
-\r
-import org.onap.aaf.authz.layer.Result;\r
-import org.onap.aaf.dao.aaf.cass.Status;\r
-\r
-import org.onap.aaf.inno.env.APIException;\r
-import org.onap.aaf.inno.env.Env;\r
-import org.onap.aaf.inno.env.Slot;\r
-import org.onap.aaf.inno.env.TimeTaken;\r
-import org.onap.aaf.inno.env.TransStore;\r
-import com.datastax.driver.core.BoundStatement;\r
-import com.datastax.driver.core.Cluster;\r
-import com.datastax.driver.core.ConsistencyLevel;\r
-import com.datastax.driver.core.ResultSet;\r
-import com.datastax.driver.core.ResultSetFuture;\r
-import com.datastax.driver.core.Row;\r
-import com.datastax.driver.core.Session;\r
-import com.datastax.driver.core.exceptions.DriverException;\r
-\r
-public abstract class AbsCassDAO<TRANS extends TransStore,DATA> {\r
-       protected static final char DOT = '.';\r
-       protected static final char DOT_PLUS_ONE = '.'+1;\r
-       protected static final String FIRST_CHAR = Character.toString((char)0);\r
-       protected static final String LAST_CHAR = Character.toString((char)Character.MAX_VALUE);\r
-       protected static final int FIELD_COMMAS = 0;\r
-       protected static final int QUESTION_COMMAS = 1;\r
-       protected static final int ASSIGNMENT_COMMAS = 2;\r
-       protected static final int WHERE_ANDS = 3;\r
-       \r
-       private Cluster cluster; \r
-       private Session session;\r
-       private final String keyspace;\r
-       // If this is null, then we own session\r
-       private final AbsCassDAO<TRANS,?> owningDAO;\r
-       protected Class<DATA> dataClass;\r
-       private final String name;\r
-       private static Slot sessionSlot;\r
-       //private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<AbsCassDAO<TransStore,?>.PSInfo>();\r
-       private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo>();\r
-       private static final List<Object> EMPTY = new ArrayList<Object>(0);\r
-       private static final Deque<ResetRequest> resetDeque = new ConcurrentLinkedDeque<ResetRequest>();\r
-       private static boolean resetTrigger = false;\r
-       private static long nextAvailableReset = 0;\r
-       \r
-\r
-       public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass) {\r
-               this.name = name;\r
-               this.cluster = cluster;\r
-               this.keyspace = keyspace;\r
-               owningDAO = null;  // we own session\r
-               session = null;\r
-               this.dataClass = dataClass;\r
-               \r
-       }\r
-\r
-       public AbsCassDAO(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass) {      \r
-               this.name = name;\r
-               cluster = aDao.cluster;\r
-               keyspace = aDao.keyspace;\r
-               session = null;\r
-               owningDAO = aDao; // We do not own session\r
-               this.dataClass = dataClass;\r
-       }\r
-       \r
-       public static void setSessionSlot(Slot slot) {\r
-               sessionSlot = slot;\r
-       }\r
-\r
-       //Note: Lower case ON PURPOSE. These names used to create History Messages\r
-       public enum CRUD {\r
-               create,read,update,delete\r
-       ;\r
-\r
-}\r
-\r
-       public class PSInfo {\r
-               private BoundStatement ps;\r
-               private final int size;\r
-               private final Loader<DATA> loader;\r
-               private final CRUD crud; // Store CRUD, because it makes a difference in Object Order, see Loader\r
-               private final String cql;\r
-               private final ConsistencyLevel consistency;\r
-\r
-\r
-               /**\r
-                * Create a PSInfo and create Prepared Statement\r
-                * \r
-                * @param trans\r
-                * @param theCQL\r
-                * @param loader\r
-                */\r
-               public PSInfo(TRANS trans, String theCQL, Loader<DATA> loader, ConsistencyLevel consistency) {\r
-                       this.loader = loader;\r
-                       this.consistency=consistency;\r
-                       psinfos.add(this);\r
-\r
-                       cql = theCQL.trim().toUpperCase();\r
-                       if(cql.startsWith("INSERT")) {\r
-                               crud = CRUD.create;\r
-                       } else if(cql.startsWith("UPDATE")) {\r
-                               crud = CRUD.update;\r
-                       } else if(cql.startsWith("DELETE")) {\r
-                               crud = CRUD.delete;\r
-                       } else {\r
-                               crud = CRUD.read;\r
-                       }\r
-                       \r
-                       int idx = 0, count=0;\r
-                       while((idx=cql.indexOf('?',idx))>=0) {\r
-                               ++idx;\r
-                               ++count;\r
-                       }\r
-                       size=count;\r
-               }\r
-               \r
-               public synchronized void reset() {\r
-                       ps = null;\r
-               }\r
-               \r
-               private BoundStatement ps(TransStore trans) throws APIException, IOException {\r
-                       if(ps==null) {\r
-                               synchronized(this) {\r
-                                       if(ps==null) {\r
-                                               TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB);\r
-                                               try {\r
-                                                       ps = new BoundStatement(getSession(trans).prepare(cql));\r
-                                                       ps.setConsistencyLevel(consistency);\r
-                                               } catch (DriverException e) {\r
-                                                       reportPerhapsReset(trans,e);\r
-                                                       throw e;\r
-                                               } finally {\r
-                                                       tt.done();\r
-                                               }\r
-                                       }\r
-                               }\r
-                       }\r
-                       return ps;\r
-               }\r
-\r
-               /**\r
-                * Execute a Prepared Statement by extracting from DATA object\r
-                * \r
-                * @param trans\r
-                * @param text\r
-                * @param data\r
-                * @return\r
-                */\r
-               public Result<ResultSetFuture> execAsync(TRANS trans, String text, DATA data) {\r
-                       TimeTaken tt = trans.start(text, Env.REMOTE);\r
-                       try {\r
-                               return Result.ok(getSession(trans).executeAsync(\r
-                                               ps(trans).bind(loader.extract(data, size, crud))));\r
-                       } catch (DriverException | APIException | IOException e) {\r
-                               AbsCassDAO.this.reportPerhapsReset(trans,e);\r
-                               return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
-                       } finally {\r
-                               tt.done();\r
-                       }\r
-               }\r
-\r
-               /**\r
-                * Execute a Prepared Statement on Object[] key\r
-                * \r
-                * @param trans\r
-                * @param text\r
-                * @param objs\r
-                * @return\r
-                */\r
-               public Result<ResultSetFuture> execAsync(TRANS trans, String text, Object ... objs) {\r
-                       TimeTaken tt = trans.start(text, Env.REMOTE);\r
-                       try {\r
-                               return Result.ok(getSession(trans).executeAsync(ps(trans).bind(objs)));\r
-                       } catch (DriverException | APIException | IOException e) {\r
-                               AbsCassDAO.this.reportPerhapsReset(trans,e);\r
-                               return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
-                       } finally {\r
-                               tt.done();\r
-                       }\r
-               }\r
-               \r
-               /* \r
-                * Note:\r
-                * \r
-                */\r
-\r
-               /**\r
-                * Execute a Prepared Statement by extracting from DATA object\r
-                * \r
-                * @param trans\r
-                * @param text\r
-                * @param data\r
-                * @return\r
-                */\r
-               public Result<ResultSet> exec(TRANS trans, String text, DATA data) {\r
-                       TimeTaken tt = trans.start(text, Env.REMOTE);\r
-                       try {\r
-                               /*\r
-                                * "execute" (and executeAsync)\r
-                                * Executes the provided query.\r
-                                       This method blocks until at least some result has been received from the database. However, \r
-                                       for SELECT queries, it does not guarantee that the result has been received in full. But it \r
-                                       does guarantee that some response has been received from the database, and in particular \r
-                                       guarantee that if the request is invalid, an exception will be thrown by this method.\r
-\r
-                                       Parameters:\r
-                                       statement - the CQL query to execute (that can be any Statement).\r
-                                       Returns:\r
-                                               the result of the query. That result will never be null but can be empty (and will \r
-                                               be for any non SELECT query).\r
-                                */\r
-                               return Result.ok(getSession(trans).execute(\r
-                                               ps(trans).bind(loader.extract(data, size, crud))));\r
-                       } catch (DriverException | APIException | IOException e) {\r
-                               AbsCassDAO.this.reportPerhapsReset(trans,e);\r
-                               return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
-                       } finally {\r
-                               tt.done();\r
-                       }\r
-               }\r
-\r
-               /**\r
-                * Execute a Prepared Statement on Object[] key\r
-                * \r
-                * @param trans\r
-                * @param text\r
-                * @param objs\r
-                * @return\r
-                */\r
-               public Result<ResultSet> exec(TRANS trans, String text, Object ... objs) {\r
-                       TimeTaken tt = trans.start(text, Env.REMOTE);\r
-                       try {\r
-                               return Result.ok(getSession(trans).execute(ps(trans).bind(objs)));\r
-                       } catch (DriverException | APIException | IOException e) {\r
-                               AbsCassDAO.this.reportPerhapsReset(trans,e);\r
-                               return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
-                       } finally {\r
-                               tt.done();\r
-                       }\r
-               }\r
-\r
-               /**\r
-                * Read the Data from Cassandra given a Prepared Statement (defined by the\r
-                * DAO Instance)\r
-                *\r
-                * This is common behavior among all DAOs.\r
-                * @throws DAOException\r
-                */\r
-               public Result<List<DATA>> read(TRANS trans, String text, Object[] key) {\r
-                       TimeTaken tt = trans.start(text,Env.REMOTE);\r
-                       \r
-                       ResultSet rs;\r
-                       try {\r
-                               rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key));\r
-/// TEST CODE for Exception                            \r
-//                             boolean force = true; \r
-//                             if(force) {\r
-//                                     Map<InetSocketAddress, Throwable> misa = new HashMap<InetSocketAddress,Throwable>();\r
-//                                     //misa.put(new InetSocketAddress(444),new Exception("no host was tried"));\r
-//                                     misa.put(new InetSocketAddress(444),new Exception("Connection has been closed"));\r
-//                                     throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa);\r
-////                                   throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried");\r
-//                             }\r
-//// END TEST CODE\r
-                       } catch (DriverException | APIException | IOException e) {\r
-                               AbsCassDAO.this.reportPerhapsReset(trans,e);\r
-                               return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
-                       } finally {\r
-                               tt.done();\r
-                       }\r
-                       \r
-                       return extract(loader,rs,null /*let Array be created if necessary*/,dflt);\r
-               }\r
-               \r
-               public Result<List<DATA>> read(TRANS trans, String text, DATA data) {\r
-                       return read(trans,text, loader.extract(data, size, crud));\r
-               }\r
-               \r
-               public Object[] keyFrom(DATA data) {\r
-                       return loader.extract(data, size, CRUD.delete); // Delete is key only\r
-               }\r
-\r
-               /*\r
-                * Note: in case PSInfos are deleted, we want to remove them from list.  This is not expected, \r
-                * but we don't want a data leak if it does.  Finalize doesn't have to happen quickly\r
-                */\r
-               @Override\r
-               protected void finalize() throws Throwable {\r
-                       psinfos.remove(this);\r
-               }\r
-       }\r
-\r
-       protected final Accept<DATA> dflt = new Accept<DATA>() {\r
-               @Override\r
-               public boolean ok(DATA data) {\r
-                       return true;\r
-               }\r
-       };\r
-\r
-\r
-       @SuppressWarnings("unchecked")\r
-    protected final Result<List<DATA>> extract(Loader<DATA> loader, ResultSet rs, List<DATA> indata, Accept<DATA> accept) {\r
-               List<Row> rows = rs.all();\r
-               if(rows.isEmpty()) {\r
-                       return Result.ok((List<DATA>)EMPTY); // Result sets now .emptyList(true);\r
-               } else {\r
-                       DATA d;\r
-                       List<DATA> data = indata==null?new ArrayList<DATA>(rows.size()):indata;\r
-                       \r
-                       for(Row row : rows) {\r
-                               try {\r
-                                       d = loader.load(dataClass.newInstance(),row);\r
-                                       if(accept.ok(d)) {\r
-                                               data.add(d);\r
-                                       }\r
-                               } catch(Exception e) {\r
-                                       return Result.err(e);\r
-                               }\r
-                       }\r
-                       return Result.ok(data);\r
-               }\r
-    }\r
-    \r
-       private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created";\r
-       private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created";\r
-       private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session";\r
-\r
-       private static class ResetRequest {\r
-               //package on purpose\r
-               Session session;\r
-               long timestamp;\r
-               \r
-               public ResetRequest(Session session) {\r
-                       this.session = session;\r
-                       timestamp = System.currentTimeMillis();\r
-               }\r
-       }\r
-\r
-       \r
-       public static final void primePSIs(TransStore trans) throws APIException, IOException {\r
-               for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {\r
-                       if(psi.ps==null) {\r
-                               psi.ps(trans);\r
-                       }\r
-               }\r
-       }\r
-       \r
-       public final Session getSession(TransStore trans) throws APIException, IOException {\r
-               // Try to use Trans' session, if exists\r
-               if(sessionSlot!=null) { // try to get from Trans\r
-                       Session sess = trans.get(sessionSlot, null);\r
-                       if(sess!=null) {\r
-                               return sess;\r
-                       }\r
-               }\r
-               \r
-               // If there's an owning DAO, use it's session\r
-               if(owningDAO!=null) {\r
-                       return owningDAO.getSession(trans);\r
-               }\r
-               \r
-               // OK, nothing else works... get our own.\r
-               if(session==null || resetTrigger) {\r
-                       Cluster tempCluster = null;\r
-                       Session tempSession = null;\r
-                       try {\r
-                               synchronized(NEW_CASSANDRA_SESSION_CREATED) {\r
-                                       boolean reset = false;\r
-                                       for(ResetRequest r : resetDeque) {\r
-                                               if(r.session == session) {\r
-                                                       if(r.timestamp>nextAvailableReset) {\r
-                                                               reset=true;\r
-                                                               nextAvailableReset = System.currentTimeMillis() + 60000;\r
-                                                               tempCluster = cluster;\r
-                                                               tempSession = session;\r
-                                                               break;\r
-                                                       } else {\r
-                                                               trans.warn().log("Cassandra Connection Reset Ignored: Recent Reset");\r
-                                                       }\r
-                                               }\r
-                                       }\r
-       \r
-                                       if(reset || session == null) {\r
-                                               TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB);\r
-                                               try {\r
-                                                       // Note: Maitrayee recommended not closing the cluster, just\r
-                                                       // overwrite it. 9/30/2016 assuming same for Session\r
-                                                       // This was a bad idea.  Ran out of File Handles as I suspected..\r
-                                                       if(reset) {\r
-                                                               for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {\r
-                                                                       psi.reset();\r
-                                                               }\r
-                                                       }\r
-                                                       if(reset || cluster==null) {\r
-                                                               cluster = CassAccess.cluster(trans, keyspace);\r
-                                                               trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED);\r
-                                                       }\r
-                                                       if(reset || session==null) {\r
-                                                               session = cluster.connect(keyspace);\r
-                                                               trans.warn().log(NEW_CASSANDRA_SESSION_CREATED);\r
-                                                       }\r
-                                               } finally {\r
-                                                       resetTrigger=false;\r
-                                                       tt.done();\r
-                                               }\r
-                                       }\r
-                               }\r
-                       } finally {\r
-                               TimeTaken tt = trans.start("Clear Reset Deque", Env.SUB);\r
-                               try {\r
-                                       resetDeque.clear();\r
-                                       // Not clearing Session/Cluster appears to kill off FileHandles\r
-                                       if(tempSession!=null && !tempSession.isClosed()) {\r
-                                               tempSession.close();\r
-                                       }\r
-                                       if(tempCluster!=null && !tempCluster.isClosed()) {\r
-                                               tempCluster.close();\r
-                                       }\r
-                               } finally {\r
-                                       tt.done();\r
-                               }\r
-                       }\r
-               }\r
-               return session;\r
-       }\r
-       \r
-       public final boolean reportPerhapsReset(TransStore trans, Exception e) {\r
-               if(owningDAO!=null) {\r
-                       return owningDAO.reportPerhapsReset(trans, e);\r
-               } else {\r
-                       boolean rv = false;\r
-                       if(CassAccess.isResetException(e)) {\r
-                               trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName());\r
-                               resetDeque.addFirst(new ResetRequest(session));\r
-                               rv = resetTrigger = true;\r
-                       } \r
-                       trans.error().log(e);\r
-                       return rv;\r
-               }\r
-       }\r
-\r
-       public void close(TransStore trans) {\r
-               if(owningDAO==null) {\r
-                       if(session!=null) {\r
-                               TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB);\r
-                               try {\r
-                                       session.close();\r
-                               } finally {\r
-                                       tt.done();\r
-                               }\r
-                               session = null;\r
-                       } else {\r
-                               trans.debug().log("close called(), Session already closed");\r
-                       }\r
-               } else {\r
-                       owningDAO.close(trans);\r
-               }\r
-       }\r
-\r
-       protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) {\r
-       }\r
-       \r
-       protected interface Accept<DATA> {\r
-               public boolean ok(DATA data);\r
-       }\r
-\r
-}\r
-\r
-\r
-\r