Update project structure to org.onap.aaf
[aaf/authz.git] / authz-cass / src / main / java / org / onap / aaf / dao / AbsCassDAO.java
diff --git a/authz-cass/src/main/java/org/onap/aaf/dao/AbsCassDAO.java b/authz-cass/src/main/java/org/onap/aaf/dao/AbsCassDAO.java
new file mode 100644 (file)
index 0000000..c76a88f
--- /dev/null
@@ -0,0 +1,497 @@
+/*******************************************************************************\r
+ * ============LICENSE_START====================================================\r
+ * * org.onap.aaf\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ *  *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ *  * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+package org.onap.aaf.dao;\r
+\r
+import java.io.IOException;\r
+import java.util.ArrayList;\r
+import java.util.Deque;\r
+import java.util.List;\r
+import java.util.concurrent.ConcurrentLinkedDeque;\r
+\r
+import org.onap.aaf.authz.layer.Result;\r
+import org.onap.aaf.dao.aaf.cass.Status;\r
+\r
+import org.onap.aaf.inno.env.APIException;\r
+import org.onap.aaf.inno.env.Env;\r
+import org.onap.aaf.inno.env.Slot;\r
+import org.onap.aaf.inno.env.TimeTaken;\r
+import org.onap.aaf.inno.env.TransStore;\r
+import com.datastax.driver.core.BoundStatement;\r
+import com.datastax.driver.core.Cluster;\r
+import com.datastax.driver.core.ConsistencyLevel;\r
+import com.datastax.driver.core.ResultSet;\r
+import com.datastax.driver.core.ResultSetFuture;\r
+import com.datastax.driver.core.Row;\r
+import com.datastax.driver.core.Session;\r
+import com.datastax.driver.core.exceptions.DriverException;\r
+\r
+public abstract class AbsCassDAO<TRANS extends TransStore,DATA> {\r
+       protected static final char DOT = '.';\r
+       protected static final char DOT_PLUS_ONE = '.'+1;\r
+       protected static final String FIRST_CHAR = Character.toString((char)0);\r
+       protected static final String LAST_CHAR = Character.toString((char)Character.MAX_VALUE);\r
+       protected static final int FIELD_COMMAS = 0;\r
+       protected static final int QUESTION_COMMAS = 1;\r
+       protected static final int ASSIGNMENT_COMMAS = 2;\r
+       protected static final int WHERE_ANDS = 3;\r
+       \r
+       private Cluster cluster; \r
+       private Session session;\r
+       private final String keyspace;\r
+       // If this is null, then we own session\r
+       private final AbsCassDAO<TRANS,?> owningDAO;\r
+       protected Class<DATA> dataClass;\r
+       private final String name;\r
+       private static Slot sessionSlot;\r
+       //private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<AbsCassDAO<TransStore,?>.PSInfo>();\r
+       private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo>();\r
+       private static final List<Object> EMPTY = new ArrayList<Object>(0);\r
+       private static final Deque<ResetRequest> resetDeque = new ConcurrentLinkedDeque<ResetRequest>();\r
+       private static boolean resetTrigger = false;\r
+       private static long nextAvailableReset = 0;\r
+       \r
+\r
+       public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass) {\r
+               this.name = name;\r
+               this.cluster = cluster;\r
+               this.keyspace = keyspace;\r
+               owningDAO = null;  // we own session\r
+               session = null;\r
+               this.dataClass = dataClass;\r
+               \r
+       }\r
+\r
+       public AbsCassDAO(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass) {      \r
+               this.name = name;\r
+               cluster = aDao.cluster;\r
+               keyspace = aDao.keyspace;\r
+               session = null;\r
+               owningDAO = aDao; // We do not own session\r
+               this.dataClass = dataClass;\r
+       }\r
+       \r
+       public static void setSessionSlot(Slot slot) {\r
+               sessionSlot = slot;\r
+       }\r
+\r
+       //Note: Lower case ON PURPOSE. These names used to create History Messages\r
+       public enum CRUD {\r
+               create,read,update,delete\r
+       ;\r
+\r
+}\r
+\r
+       public class PSInfo {\r
+               private BoundStatement ps;\r
+               private final int size;\r
+               private final Loader<DATA> loader;\r
+               private final CRUD crud; // Store CRUD, because it makes a difference in Object Order, see Loader\r
+               private final String cql;\r
+               private final ConsistencyLevel consistency;\r
+\r
+\r
+               /**\r
+                * Create a PSInfo and create Prepared Statement\r
+                * \r
+                * @param trans\r
+                * @param theCQL\r
+                * @param loader\r
+                */\r
+               public PSInfo(TRANS trans, String theCQL, Loader<DATA> loader, ConsistencyLevel consistency) {\r
+                       this.loader = loader;\r
+                       this.consistency=consistency;\r
+                       psinfos.add(this);\r
+\r
+                       cql = theCQL.trim().toUpperCase();\r
+                       if(cql.startsWith("INSERT")) {\r
+                               crud = CRUD.create;\r
+                       } else if(cql.startsWith("UPDATE")) {\r
+                               crud = CRUD.update;\r
+                       } else if(cql.startsWith("DELETE")) {\r
+                               crud = CRUD.delete;\r
+                       } else {\r
+                               crud = CRUD.read;\r
+                       }\r
+                       \r
+                       int idx = 0, count=0;\r
+                       while((idx=cql.indexOf('?',idx))>=0) {\r
+                               ++idx;\r
+                               ++count;\r
+                       }\r
+                       size=count;\r
+               }\r
+               \r
+               public synchronized void reset() {\r
+                       ps = null;\r
+               }\r
+               \r
+               private BoundStatement ps(TransStore trans) throws APIException, IOException {\r
+                       if(ps==null) {\r
+                               synchronized(this) {\r
+                                       if(ps==null) {\r
+                                               TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB);\r
+                                               try {\r
+                                                       ps = new BoundStatement(getSession(trans).prepare(cql));\r
+                                                       ps.setConsistencyLevel(consistency);\r
+                                               } catch (DriverException e) {\r
+                                                       reportPerhapsReset(trans,e);\r
+                                                       throw e;\r
+                                               } finally {\r
+                                                       tt.done();\r
+                                               }\r
+                                       }\r
+                               }\r
+                       }\r
+                       return ps;\r
+               }\r
+\r
+               /**\r
+                * Execute a Prepared Statement by extracting from DATA object\r
+                * \r
+                * @param trans\r
+                * @param text\r
+                * @param data\r
+                * @return\r
+                */\r
+               public Result<ResultSetFuture> execAsync(TRANS trans, String text, DATA data) {\r
+                       TimeTaken tt = trans.start(text, Env.REMOTE);\r
+                       try {\r
+                               return Result.ok(getSession(trans).executeAsync(\r
+                                               ps(trans).bind(loader.extract(data, size, crud))));\r
+                       } catch (DriverException | APIException | IOException e) {\r
+                               AbsCassDAO.this.reportPerhapsReset(trans,e);\r
+                               return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
+                       } finally {\r
+                               tt.done();\r
+                       }\r
+               }\r
+\r
+               /**\r
+                * Execute a Prepared Statement on Object[] key\r
+                * \r
+                * @param trans\r
+                * @param text\r
+                * @param objs\r
+                * @return\r
+                */\r
+               public Result<ResultSetFuture> execAsync(TRANS trans, String text, Object ... objs) {\r
+                       TimeTaken tt = trans.start(text, Env.REMOTE);\r
+                       try {\r
+                               return Result.ok(getSession(trans).executeAsync(ps(trans).bind(objs)));\r
+                       } catch (DriverException | APIException | IOException e) {\r
+                               AbsCassDAO.this.reportPerhapsReset(trans,e);\r
+                               return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
+                       } finally {\r
+                               tt.done();\r
+                       }\r
+               }\r
+               \r
+               /* \r
+                * Note:\r
+                * \r
+                */\r
+\r
+               /**\r
+                * Execute a Prepared Statement by extracting from DATA object\r
+                * \r
+                * @param trans\r
+                * @param text\r
+                * @param data\r
+                * @return\r
+                */\r
+               public Result<ResultSet> exec(TRANS trans, String text, DATA data) {\r
+                       TimeTaken tt = trans.start(text, Env.REMOTE);\r
+                       try {\r
+                               /*\r
+                                * "execute" (and executeAsync)\r
+                                * Executes the provided query.\r
+                                       This method blocks until at least some result has been received from the database. However, \r
+                                       for SELECT queries, it does not guarantee that the result has been received in full. But it \r
+                                       does guarantee that some response has been received from the database, and in particular \r
+                                       guarantee that if the request is invalid, an exception will be thrown by this method.\r
+\r
+                                       Parameters:\r
+                                       statement - the CQL query to execute (that can be any Statement).\r
+                                       Returns:\r
+                                               the result of the query. That result will never be null but can be empty (and will \r
+                                               be for any non SELECT query).\r
+                                */\r
+                               return Result.ok(getSession(trans).execute(\r
+                                               ps(trans).bind(loader.extract(data, size, crud))));\r
+                       } catch (DriverException | APIException | IOException e) {\r
+                               AbsCassDAO.this.reportPerhapsReset(trans,e);\r
+                               return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
+                       } finally {\r
+                               tt.done();\r
+                       }\r
+               }\r
+\r
+               /**\r
+                * Execute a Prepared Statement on Object[] key\r
+                * \r
+                * @param trans\r
+                * @param text\r
+                * @param objs\r
+                * @return\r
+                */\r
+               public Result<ResultSet> exec(TRANS trans, String text, Object ... objs) {\r
+                       TimeTaken tt = trans.start(text, Env.REMOTE);\r
+                       try {\r
+                               return Result.ok(getSession(trans).execute(ps(trans).bind(objs)));\r
+                       } catch (DriverException | APIException | IOException e) {\r
+                               AbsCassDAO.this.reportPerhapsReset(trans,e);\r
+                               return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
+                       } finally {\r
+                               tt.done();\r
+                       }\r
+               }\r
+\r
+               /**\r
+                * Read the Data from Cassandra given a Prepared Statement (defined by the\r
+                * DAO Instance)\r
+                *\r
+                * This is common behavior among all DAOs.\r
+                * @throws DAOException\r
+                */\r
+               public Result<List<DATA>> read(TRANS trans, String text, Object[] key) {\r
+                       TimeTaken tt = trans.start(text,Env.REMOTE);\r
+                       \r
+                       ResultSet rs;\r
+                       try {\r
+                               rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key));\r
+/// TEST CODE for Exception                            \r
+//                             boolean force = true; \r
+//                             if(force) {\r
+//                                     Map<InetSocketAddress, Throwable> misa = new HashMap<InetSocketAddress,Throwable>();\r
+//                                     //misa.put(new InetSocketAddress(444),new Exception("no host was tried"));\r
+//                                     misa.put(new InetSocketAddress(444),new Exception("Connection has been closed"));\r
+//                                     throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa);\r
+////                                   throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried");\r
+//                             }\r
+//// END TEST CODE\r
+                       } catch (DriverException | APIException | IOException e) {\r
+                               AbsCassDAO.this.reportPerhapsReset(trans,e);\r
+                               return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
+                       } finally {\r
+                               tt.done();\r
+                       }\r
+                       \r
+                       return extract(loader,rs,null /*let Array be created if necessary*/,dflt);\r
+               }\r
+               \r
+               public Result<List<DATA>> read(TRANS trans, String text, DATA data) {\r
+                       return read(trans,text, loader.extract(data, size, crud));\r
+               }\r
+               \r
+               public Object[] keyFrom(DATA data) {\r
+                       return loader.extract(data, size, CRUD.delete); // Delete is key only\r
+               }\r
+\r
+               /*\r
+                * Note: in case PSInfos are deleted, we want to remove them from list.  This is not expected, \r
+                * but we don't want a data leak if it does.  Finalize doesn't have to happen quickly\r
+                */\r
+               @Override\r
+               protected void finalize() throws Throwable {\r
+                       psinfos.remove(this);\r
+               }\r
+       }\r
+\r
+       protected final Accept<DATA> dflt = new Accept<DATA>() {\r
+               @Override\r
+               public boolean ok(DATA data) {\r
+                       return true;\r
+               }\r
+       };\r
+\r
+\r
+       @SuppressWarnings("unchecked")\r
+    protected final Result<List<DATA>> extract(Loader<DATA> loader, ResultSet rs, List<DATA> indata, Accept<DATA> accept) {\r
+               List<Row> rows = rs.all();\r
+               if(rows.isEmpty()) {\r
+                       return Result.ok((List<DATA>)EMPTY); // Result sets now .emptyList(true);\r
+               } else {\r
+                       DATA d;\r
+                       List<DATA> data = indata==null?new ArrayList<DATA>(rows.size()):indata;\r
+                       \r
+                       for(Row row : rows) {\r
+                               try {\r
+                                       d = loader.load(dataClass.newInstance(),row);\r
+                                       if(accept.ok(d)) {\r
+                                               data.add(d);\r
+                                       }\r
+                               } catch(Exception e) {\r
+                                       return Result.err(e);\r
+                               }\r
+                       }\r
+                       return Result.ok(data);\r
+               }\r
+    }\r
+    \r
+       private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created";\r
+       private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created";\r
+       private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session";\r
+\r
+       private static class ResetRequest {\r
+               //package on purpose\r
+               Session session;\r
+               long timestamp;\r
+               \r
+               public ResetRequest(Session session) {\r
+                       this.session = session;\r
+                       timestamp = System.currentTimeMillis();\r
+               }\r
+       }\r
+\r
+       \r
+       public static final void primePSIs(TransStore trans) throws APIException, IOException {\r
+               for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {\r
+                       if(psi.ps==null) {\r
+                               psi.ps(trans);\r
+                       }\r
+               }\r
+       }\r
+       \r
+       public final Session getSession(TransStore trans) throws APIException, IOException {\r
+               // Try to use Trans' session, if exists\r
+               if(sessionSlot!=null) { // try to get from Trans\r
+                       Session sess = trans.get(sessionSlot, null);\r
+                       if(sess!=null) {\r
+                               return sess;\r
+                       }\r
+               }\r
+               \r
+               // If there's an owning DAO, use it's session\r
+               if(owningDAO!=null) {\r
+                       return owningDAO.getSession(trans);\r
+               }\r
+               \r
+               // OK, nothing else works... get our own.\r
+               if(session==null || resetTrigger) {\r
+                       Cluster tempCluster = null;\r
+                       Session tempSession = null;\r
+                       try {\r
+                               synchronized(NEW_CASSANDRA_SESSION_CREATED) {\r
+                                       boolean reset = false;\r
+                                       for(ResetRequest r : resetDeque) {\r
+                                               if(r.session == session) {\r
+                                                       if(r.timestamp>nextAvailableReset) {\r
+                                                               reset=true;\r
+                                                               nextAvailableReset = System.currentTimeMillis() + 60000;\r
+                                                               tempCluster = cluster;\r
+                                                               tempSession = session;\r
+                                                               break;\r
+                                                       } else {\r
+                                                               trans.warn().log("Cassandra Connection Reset Ignored: Recent Reset");\r
+                                                       }\r
+                                               }\r
+                                       }\r
+       \r
+                                       if(reset || session == null) {\r
+                                               TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB);\r
+                                               try {\r
+                                                       // Note: Maitrayee recommended not closing the cluster, just\r
+                                                       // overwrite it. 9/30/2016 assuming same for Session\r
+                                                       // This was a bad idea.  Ran out of File Handles as I suspected..\r
+                                                       if(reset) {\r
+                                                               for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {\r
+                                                                       psi.reset();\r
+                                                               }\r
+                                                       }\r
+                                                       if(reset || cluster==null) {\r
+                                                               cluster = CassAccess.cluster(trans, keyspace);\r
+                                                               trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED);\r
+                                                       }\r
+                                                       if(reset || session==null) {\r
+                                                               session = cluster.connect(keyspace);\r
+                                                               trans.warn().log(NEW_CASSANDRA_SESSION_CREATED);\r
+                                                       }\r
+                                               } finally {\r
+                                                       resetTrigger=false;\r
+                                                       tt.done();\r
+                                               }\r
+                                       }\r
+                               }\r
+                       } finally {\r
+                               TimeTaken tt = trans.start("Clear Reset Deque", Env.SUB);\r
+                               try {\r
+                                       resetDeque.clear();\r
+                                       // Not clearing Session/Cluster appears to kill off FileHandles\r
+                                       if(tempSession!=null && !tempSession.isClosed()) {\r
+                                               tempSession.close();\r
+                                       }\r
+                                       if(tempCluster!=null && !tempCluster.isClosed()) {\r
+                                               tempCluster.close();\r
+                                       }\r
+                               } finally {\r
+                                       tt.done();\r
+                               }\r
+                       }\r
+               }\r
+               return session;\r
+       }\r
+       \r
+       public final boolean reportPerhapsReset(TransStore trans, Exception e) {\r
+               if(owningDAO!=null) {\r
+                       return owningDAO.reportPerhapsReset(trans, e);\r
+               } else {\r
+                       boolean rv = false;\r
+                       if(CassAccess.isResetException(e)) {\r
+                               trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName());\r
+                               resetDeque.addFirst(new ResetRequest(session));\r
+                               rv = resetTrigger = true;\r
+                       } \r
+                       trans.error().log(e);\r
+                       return rv;\r
+               }\r
+       }\r
+\r
+       public void close(TransStore trans) {\r
+               if(owningDAO==null) {\r
+                       if(session!=null) {\r
+                               TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB);\r
+                               try {\r
+                                       session.close();\r
+                               } finally {\r
+                                       tt.done();\r
+                               }\r
+                               session = null;\r
+                       } else {\r
+                               trans.debug().log("close called(), Session already closed");\r
+                       }\r
+               } else {\r
+                       owningDAO.close(trans);\r
+               }\r
+       }\r
+\r
+       protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) {\r
+       }\r
+       \r
+       protected interface Accept<DATA> {\r
+               public boolean ok(DATA data);\r
+       }\r
+\r
+}\r
+\r
+\r
+\r