--- /dev/null
+/*******************************************************************************\r
+ * ============LICENSE_START====================================================\r
+ * * org.onap.aai\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * Copyright © 2017 Amdocs\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ * * http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ * * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+package com.att.dao;\r
+\r
+import java.io.IOException;\r
+import java.util.ArrayList;\r
+import java.util.Deque;\r
+import java.util.List;\r
+import java.util.concurrent.ConcurrentLinkedDeque;\r
+\r
+import com.att.authz.layer.Result;\r
+import com.att.dao.aaf.cass.Status;\r
+import com.att.inno.env.APIException;\r
+import com.att.inno.env.Env;\r
+import com.att.inno.env.Slot;\r
+import com.att.inno.env.TimeTaken;\r
+import com.att.inno.env.TransStore;\r
+import com.datastax.driver.core.BoundStatement;\r
+import com.datastax.driver.core.Cluster;\r
+import com.datastax.driver.core.ConsistencyLevel;\r
+import com.datastax.driver.core.ResultSet;\r
+import com.datastax.driver.core.ResultSetFuture;\r
+import com.datastax.driver.core.Row;\r
+import com.datastax.driver.core.Session;\r
+import com.datastax.driver.core.exceptions.DriverException;\r
+\r
+public abstract class AbsCassDAO<TRANS extends TransStore,DATA> {\r
+ protected static final char DOT = '.';\r
+ protected static final char DOT_PLUS_ONE = '.'+1;\r
+ protected static final String FIRST_CHAR = Character.toString((char)0);\r
+ protected static final String LAST_CHAR = Character.toString((char)Character.MAX_VALUE);\r
+ protected static final int FIELD_COMMAS = 0;\r
+ protected static final int QUESTION_COMMAS = 1;\r
+ protected static final int ASSIGNMENT_COMMAS = 2;\r
+ protected static final int WHERE_ANDS = 3;\r
+ \r
+ private Cluster cluster; \r
+ private Session session;\r
+ private final String keyspace;\r
+ // If this is null, then we own session\r
+ private final AbsCassDAO<TRANS,?> owningDAO;\r
+ protected Class<DATA> dataClass;\r
+ private final String name;\r
+ private static Slot sessionSlot;\r
+ //private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<AbsCassDAO<TransStore,?>.PSInfo>();\r
+ private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo>();\r
+ private static final List<Object> EMPTY = new ArrayList<Object>(0);\r
+ private static final Deque<ResetRequest> resetDeque = new ConcurrentLinkedDeque<ResetRequest>();\r
+ private static boolean resetTrigger = false;\r
+ private static long nextAvailableReset = 0;\r
+ \r
+\r
+ public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass) {\r
+ this.name = name;\r
+ this.cluster = cluster;\r
+ this.keyspace = keyspace;\r
+ owningDAO = null; // we own session\r
+ session = null;\r
+ this.dataClass = dataClass;\r
+ \r
+ }\r
+\r
+ public AbsCassDAO(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass) { \r
+ this.name = name;\r
+ cluster = aDao.cluster;\r
+ keyspace = aDao.keyspace;\r
+ session = null;\r
+ owningDAO = aDao; // We do not own session\r
+ this.dataClass = dataClass;\r
+ }\r
+ \r
+ public static void setSessionSlot(Slot slot) {\r
+ sessionSlot = slot;\r
+ }\r
+\r
+ //Note: Lower case ON PURPOSE. These names used to create History Messages\r
+ public enum CRUD {\r
+ create,read,update,delete\r
+ ;\r
+\r
+}\r
+\r
+ public class PSInfo {\r
+ private BoundStatement ps;\r
+ private final int size;\r
+ private final Loader<DATA> loader;\r
+ private final CRUD crud; // Store CRUD, because it makes a difference in Object Order, see Loader\r
+ private final String cql;\r
+ private final ConsistencyLevel consistency;\r
+\r
+\r
+ /**\r
+ * Create a PSInfo and create Prepared Statement\r
+ * \r
+ * @param trans\r
+ * @param theCQL\r
+ * @param loader\r
+ */\r
+ public PSInfo(TRANS trans, String theCQL, Loader<DATA> loader, ConsistencyLevel consistency) {\r
+ this.loader = loader;\r
+ this.consistency=consistency;\r
+ psinfos.add(this);\r
+\r
+ cql = theCQL.trim().toUpperCase();\r
+ if(cql.startsWith("INSERT")) {\r
+ crud = CRUD.create;\r
+ } else if(cql.startsWith("UPDATE")) {\r
+ crud = CRUD.update;\r
+ } else if(cql.startsWith("DELETE")) {\r
+ crud = CRUD.delete;\r
+ } else {\r
+ crud = CRUD.read;\r
+ }\r
+ \r
+ int idx = 0, count=0;\r
+ while((idx=cql.indexOf('?',idx))>=0) {\r
+ ++idx;\r
+ ++count;\r
+ }\r
+ size=count;\r
+ }\r
+ \r
+ public synchronized void reset() {\r
+ ps = null;\r
+ }\r
+ \r
+ private BoundStatement ps(TransStore trans) throws APIException, IOException {\r
+ if(ps==null) {\r
+ synchronized(this) {\r
+ if(ps==null) {\r
+ TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB);\r
+ try {\r
+ ps = new BoundStatement(getSession(trans).prepare(cql));\r
+ ps.setConsistencyLevel(consistency);\r
+ } catch (DriverException e) {\r
+ reportPerhapsReset(trans,e);\r
+ throw e;\r
+ } finally {\r
+ tt.done();\r
+ }\r
+ }\r
+ }\r
+ }\r
+ return ps;\r
+ }\r
+\r
+ /**\r
+ * Execute a Prepared Statement by extracting from DATA object\r
+ * \r
+ * @param trans\r
+ * @param text\r
+ * @param data\r
+ * @return\r
+ */\r
+ public Result<ResultSetFuture> execAsync(TRANS trans, String text, DATA data) {\r
+ TimeTaken tt = trans.start(text, Env.REMOTE);\r
+ try {\r
+ return Result.ok(getSession(trans).executeAsync(\r
+ ps(trans).bind(loader.extract(data, size, crud))));\r
+ } catch (DriverException | APIException | IOException e) {\r
+ AbsCassDAO.this.reportPerhapsReset(trans,e);\r
+ return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
+ } finally {\r
+ tt.done();\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Execute a Prepared Statement on Object[] key\r
+ * \r
+ * @param trans\r
+ * @param text\r
+ * @param objs\r
+ * @return\r
+ */\r
+ public Result<ResultSetFuture> execAsync(TRANS trans, String text, Object ... objs) {\r
+ TimeTaken tt = trans.start(text, Env.REMOTE);\r
+ try {\r
+ return Result.ok(getSession(trans).executeAsync(ps(trans).bind(objs)));\r
+ } catch (DriverException | APIException | IOException e) {\r
+ AbsCassDAO.this.reportPerhapsReset(trans,e);\r
+ return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
+ } finally {\r
+ tt.done();\r
+ }\r
+ }\r
+ \r
+ /* \r
+ * Note:\r
+ * \r
+ */\r
+\r
+ /**\r
+ * Execute a Prepared Statement by extracting from DATA object\r
+ * \r
+ * @param trans\r
+ * @param text\r
+ * @param data\r
+ * @return\r
+ */\r
+ public Result<ResultSet> exec(TRANS trans, String text, DATA data) {\r
+ TimeTaken tt = trans.start(text, Env.REMOTE);\r
+ try {\r
+ /*\r
+ * "execute" (and executeAsync)\r
+ * Executes the provided query.\r
+ This method blocks until at least some result has been received from the database. However, \r
+ for SELECT queries, it does not guarantee that the result has been received in full. But it \r
+ does guarantee that some response has been received from the database, and in particular \r
+ guarantee that if the request is invalid, an exception will be thrown by this method.\r
+\r
+ Parameters:\r
+ statement - the CQL query to execute (that can be any Statement).\r
+ Returns:\r
+ the result of the query. That result will never be null but can be empty (and will \r
+ be for any non SELECT query).\r
+ */\r
+ return Result.ok(getSession(trans).execute(\r
+ ps(trans).bind(loader.extract(data, size, crud))));\r
+ } catch (DriverException | APIException | IOException e) {\r
+ AbsCassDAO.this.reportPerhapsReset(trans,e);\r
+ return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
+ } finally {\r
+ tt.done();\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Execute a Prepared Statement on Object[] key\r
+ * \r
+ * @param trans\r
+ * @param text\r
+ * @param objs\r
+ * @return\r
+ */\r
+ public Result<ResultSet> exec(TRANS trans, String text, Object ... objs) {\r
+ TimeTaken tt = trans.start(text, Env.REMOTE);\r
+ try {\r
+ return Result.ok(getSession(trans).execute(ps(trans).bind(objs)));\r
+ } catch (DriverException | APIException | IOException e) {\r
+ AbsCassDAO.this.reportPerhapsReset(trans,e);\r
+ return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
+ } finally {\r
+ tt.done();\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Read the Data from Cassandra given a Prepared Statement (defined by the\r
+ * DAO Instance)\r
+ *\r
+ * This is common behavior among all DAOs.\r
+ * @throws DAOException\r
+ */\r
+ public Result<List<DATA>> read(TRANS trans, String text, Object[] key) {\r
+ TimeTaken tt = trans.start(text,Env.REMOTE);\r
+ \r
+ ResultSet rs;\r
+ try {\r
+ rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key));\r
+/// TEST CODE for Exception \r
+// boolean force = true; \r
+// if(force) {\r
+// Map<InetSocketAddress, Throwable> misa = new HashMap<InetSocketAddress,Throwable>();\r
+// //misa.put(new InetSocketAddress(444),new Exception("no host was tried"));\r
+// misa.put(new InetSocketAddress(444),new Exception("Connection has been closed"));\r
+// throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa);\r
+//// throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried");\r
+// }\r
+//// END TEST CODE\r
+ } catch (DriverException | APIException | IOException e) {\r
+ AbsCassDAO.this.reportPerhapsReset(trans,e);\r
+ return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
+ } finally {\r
+ tt.done();\r
+ }\r
+ \r
+ return extract(loader,rs,null /*let Array be created if necessary*/,dflt);\r
+ }\r
+ \r
+ public Result<List<DATA>> read(TRANS trans, String text, DATA data) {\r
+ return read(trans,text, loader.extract(data, size, crud));\r
+ }\r
+ \r
+ public Object[] keyFrom(DATA data) {\r
+ return loader.extract(data, size, CRUD.delete); // Delete is key only\r
+ }\r
+\r
+ /*\r
+ * Note: in case PSInfos are deleted, we want to remove them from list. This is not expected, \r
+ * but we don't want a data leak if it does. Finalize doesn't have to happen quickly\r
+ */\r
+ @Override\r
+ protected void finalize() throws Throwable {\r
+ psinfos.remove(this);\r
+ }\r
+ }\r
+\r
+ protected final Accept<DATA> dflt = new Accept<DATA>() {\r
+ @Override\r
+ public boolean ok(DATA data) {\r
+ return true;\r
+ }\r
+ };\r
+\r
+\r
+ @SuppressWarnings("unchecked")\r
+ protected final Result<List<DATA>> extract(Loader<DATA> loader, ResultSet rs, List<DATA> indata, Accept<DATA> accept) {\r
+ List<Row> rows = rs.all();\r
+ if(rows.isEmpty()) {\r
+ return Result.ok((List<DATA>)EMPTY); // Result sets now .emptyList(true);\r
+ } else {\r
+ DATA d;\r
+ List<DATA> data = indata==null?new ArrayList<DATA>(rows.size()):indata;\r
+ \r
+ for(Row row : rows) {\r
+ try {\r
+ d = loader.load(dataClass.newInstance(),row);\r
+ if(accept.ok(d)) {\r
+ data.add(d);\r
+ }\r
+ } catch(Exception e) {\r
+ return Result.err(e);\r
+ }\r
+ }\r
+ return Result.ok(data);\r
+ }\r
+ }\r
+ \r
+ private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created";\r
+ private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created";\r
+ private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session";\r
+\r
+ private static class ResetRequest {\r
+ //package on purpose\r
+ Session session;\r
+ long timestamp;\r
+ \r
+ public ResetRequest(Session session) {\r
+ this.session = session;\r
+ timestamp = System.currentTimeMillis();\r
+ }\r
+ }\r
+\r
+ \r
+ public static final void primePSIs(TransStore trans) throws APIException, IOException {\r
+ for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {\r
+ if(psi.ps==null) {\r
+ psi.ps(trans);\r
+ }\r
+ }\r
+ }\r
+ \r
+ public final Session getSession(TransStore trans) throws APIException, IOException {\r
+ // Try to use Trans' session, if exists\r
+ if(sessionSlot!=null) { // try to get from Trans\r
+ Session sess = trans.get(sessionSlot, null);\r
+ if(sess!=null) {\r
+ return sess;\r
+ }\r
+ }\r
+ \r
+ // If there's an owning DAO, use it's session\r
+ if(owningDAO!=null) {\r
+ return owningDAO.getSession(trans);\r
+ }\r
+ \r
+ // OK, nothing else works... get our own.\r
+ if(session==null || resetTrigger) {\r
+ Cluster tempCluster = null;\r
+ Session tempSession = null;\r
+ try {\r
+ synchronized(NEW_CASSANDRA_SESSION_CREATED) {\r
+ boolean reset = false;\r
+ for(ResetRequest r : resetDeque) {\r
+ if(r.session == session) {\r
+ if(r.timestamp>nextAvailableReset) {\r
+ reset=true;\r
+ nextAvailableReset = System.currentTimeMillis() + 60000;\r
+ tempCluster = cluster;\r
+ tempSession = session;\r
+ break;\r
+ } else {\r
+ trans.warn().log("Cassandra Connection Reset Ignored: Recent Reset");\r
+ }\r
+ }\r
+ }\r
+ \r
+ if(reset || session == null) {\r
+ TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB);\r
+ try {\r
+ // Note: Maitrayee recommended not closing the cluster, just\r
+ // overwrite it. 9/30/2016 assuming same for Session\r
+ // This was a bad idea. Ran out of File Handles as I suspected..\r
+ if(reset) {\r
+ for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {\r
+ psi.reset();\r
+ }\r
+ }\r
+ if(reset || cluster==null) {\r
+ cluster = CassAccess.cluster(trans, keyspace);\r
+ trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED);\r
+ }\r
+ if(reset || session==null) {\r
+ session = cluster.connect(keyspace);\r
+ trans.warn().log(NEW_CASSANDRA_SESSION_CREATED);\r
+ }\r
+ } finally {\r
+ resetTrigger=false;\r
+ tt.done();\r
+ }\r
+ }\r
+ }\r
+ } finally {\r
+ TimeTaken tt = trans.start("Clear Reset Deque", Env.SUB);\r
+ try {\r
+ resetDeque.clear();\r
+ // Not clearing Session/Cluster appears to kill off FileHandles\r
+ if(tempSession!=null && !tempSession.isClosed()) {\r
+ tempSession.close();\r
+ }\r
+ if(tempCluster!=null && !tempCluster.isClosed()) {\r
+ tempCluster.close();\r
+ }\r
+ } finally {\r
+ tt.done();\r
+ }\r
+ }\r
+ }\r
+ return session;\r
+ }\r
+ \r
+ public final boolean reportPerhapsReset(TransStore trans, Exception e) {\r
+ if(owningDAO!=null) {\r
+ return owningDAO.reportPerhapsReset(trans, e);\r
+ } else {\r
+ boolean rv = false;\r
+ if(CassAccess.isResetException(e)) {\r
+ trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName());\r
+ resetDeque.addFirst(new ResetRequest(session));\r
+ rv = resetTrigger = true;\r
+ } \r
+ trans.error().log(e);\r
+ return rv;\r
+ }\r
+ }\r
+\r
+ public void close(TransStore trans) {\r
+ if(owningDAO==null) {\r
+ if(session!=null) {\r
+ TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB);\r
+ try {\r
+ session.close();\r
+ } finally {\r
+ tt.done();\r
+ }\r
+ session = null;\r
+ } else {\r
+ trans.debug().log("close called(), Session already closed");\r
+ }\r
+ } else {\r
+ owningDAO.close(trans);\r
+ }\r
+ }\r
+\r
+ protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) {\r
+ }\r
+ \r
+ protected interface Accept<DATA> {\r
+ public boolean ok(DATA data);\r
+ }\r
+\r
+}\r
+\r
+\r
+\r