/******************************************************************************* * ============LICENSE_START==================================================== * * org.onap.aaf * * =========================================================================== * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * * =========================================================================== * * Licensed under the Apache License, Version 2.0 (the "License"); * * you may not use this file except in compliance with the License. * * You may obtain a copy of the License at * * * * http://www.apache.org/licenses/LICENSE-2.0 * * * * Unless required by applicable law or agreed to in writing, software * * distributed under the License is distributed on an "AS IS" BASIS, * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * * See the License for the specific language governing permissions and * * limitations under the License. * * ============LICENSE_END==================================================== * * * * ECOMP is a trademark and service mark of AT&T Intellectual Property. * * ******************************************************************************/ package org.onap.aaf.dao; import java.io.IOException; import java.util.ArrayList; import java.util.Deque; import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque; import org.onap.aaf.authz.layer.Result; import org.onap.aaf.dao.aaf.cass.Status; import org.onap.aaf.inno.env.APIException; import org.onap.aaf.inno.env.Env; import org.onap.aaf.inno.env.Slot; import org.onap.aaf.inno.env.TimeTaken; import org.onap.aaf.inno.env.TransStore; import com.datastax.driver.core.BoundStatement; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.exceptions.DriverException; public abstract class AbsCassDAO { protected static final char DOT = '.'; protected static final char DOT_PLUS_ONE = '.'+1; protected static final String FIRST_CHAR = Character.toString((char)0); protected static final String LAST_CHAR = Character.toString((char)Character.MAX_VALUE); protected static final int FIELD_COMMAS = 0; protected static final int QUESTION_COMMAS = 1; protected static final int ASSIGNMENT_COMMAS = 2; protected static final int WHERE_ANDS = 3; private Cluster cluster; private Session session; private final String keyspace; // If this is null, then we own session private final AbsCassDAO owningDAO; protected Class dataClass; private final String name; private static Slot sessionSlot; //private static final ArrayList.PSInfo> psinfos = new ArrayList.PSInfo>(); private static final ArrayList.PSInfo> psinfos = new ArrayList.PSInfo>(); private static final List EMPTY = new ArrayList(0); private static final Deque resetDeque = new ConcurrentLinkedDeque(); private static boolean resetTrigger = false; private static long nextAvailableReset = 0; public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class dataClass) { this.name = name; this.cluster = cluster; this.keyspace = keyspace; owningDAO = null; // we own session session = null; this.dataClass = dataClass; } public AbsCassDAO(TRANS trans, String name, AbsCassDAO aDao, Class dataClass) { this.name = name; cluster = aDao.cluster; keyspace = aDao.keyspace; session = null; owningDAO = aDao; // We do not own session this.dataClass = dataClass; } public static void setSessionSlot(Slot slot) { sessionSlot = slot; } //Note: Lower case ON PURPOSE. These names used to create History Messages public enum CRUD { create,read,update,delete ; } public class PSInfo { private BoundStatement ps; private final int size; private final Loader loader; private final CRUD crud; // Store CRUD, because it makes a difference in Object Order, see Loader private final String cql; private final ConsistencyLevel consistency; /** * Create a PSInfo and create Prepared Statement * * @param trans * @param theCQL * @param loader */ public PSInfo(TRANS trans, String theCQL, Loader loader, ConsistencyLevel consistency) { this.loader = loader; this.consistency=consistency; psinfos.add(this); cql = theCQL.trim().toUpperCase(); if(cql.startsWith("INSERT")) { crud = CRUD.create; } else if(cql.startsWith("UPDATE")) { crud = CRUD.update; } else if(cql.startsWith("DELETE")) { crud = CRUD.delete; } else { crud = CRUD.read; } int idx = 0, count=0; while((idx=cql.indexOf('?',idx))>=0) { ++idx; ++count; } size=count; } public synchronized void reset() { ps = null; } private BoundStatement ps(TransStore trans) throws APIException, IOException { if(ps==null) { synchronized(this) { if(ps==null) { TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB); try { ps = new BoundStatement(getSession(trans).prepare(cql)); ps.setConsistencyLevel(consistency); } catch (DriverException e) { reportPerhapsReset(trans,e); throw e; } finally { tt.done(); } } } } return ps; } /** * Execute a Prepared Statement by extracting from DATA object * * @param trans * @param text * @param data * @return */ public Result execAsync(TRANS trans, String text, DATA data) { TimeTaken tt = trans.start(text, Env.REMOTE); try { return Result.ok(getSession(trans).executeAsync( ps(trans).bind(loader.extract(data, size, crud)))); } catch (DriverException | APIException | IOException e) { AbsCassDAO.this.reportPerhapsReset(trans,e); return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql); } finally { tt.done(); } } /** * Execute a Prepared Statement on Object[] key * * @param trans * @param text * @param objs * @return */ public Result execAsync(TRANS trans, String text, Object ... objs) { TimeTaken tt = trans.start(text, Env.REMOTE); try { return Result.ok(getSession(trans).executeAsync(ps(trans).bind(objs))); } catch (DriverException | APIException | IOException e) { AbsCassDAO.this.reportPerhapsReset(trans,e); return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql); } finally { tt.done(); } } /* * Note: * */ /** * Execute a Prepared Statement by extracting from DATA object * * @param trans * @param text * @param data * @return */ public Result exec(TRANS trans, String text, DATA data) { TimeTaken tt = trans.start(text, Env.REMOTE); try { /* * "execute" (and executeAsync) * Executes the provided query. This method blocks until at least some result has been received from the database. However, for SELECT queries, it does not guarantee that the result has been received in full. But it does guarantee that some response has been received from the database, and in particular guarantee that if the request is invalid, an exception will be thrown by this method. Parameters: statement - the CQL query to execute (that can be any Statement). Returns: the result of the query. That result will never be null but can be empty (and will be for any non SELECT query). */ return Result.ok(getSession(trans).execute( ps(trans).bind(loader.extract(data, size, crud)))); } catch (DriverException | APIException | IOException e) { AbsCassDAO.this.reportPerhapsReset(trans,e); return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql); } finally { tt.done(); } } /** * Execute a Prepared Statement on Object[] key * * @param trans * @param text * @param objs * @return */ public Result exec(TRANS trans, String text, Object ... objs) { TimeTaken tt = trans.start(text, Env.REMOTE); try { return Result.ok(getSession(trans).execute(ps(trans).bind(objs))); } catch (DriverException | APIException | IOException e) { AbsCassDAO.this.reportPerhapsReset(trans,e); return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql); } finally { tt.done(); } } /** * Read the Data from Cassandra given a Prepared Statement (defined by the * DAO Instance) * * This is common behavior among all DAOs. * @throws DAOException */ public Result> read(TRANS trans, String text, Object[] key) { TimeTaken tt = trans.start(text,Env.REMOTE); ResultSet rs; try { rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key)); /// TEST CODE for Exception // boolean force = true; // if(force) { // Map misa = new HashMap(); // //misa.put(new InetSocketAddress(444),new Exception("no host was tried")); // misa.put(new InetSocketAddress(444),new Exception("Connection has been closed")); // throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa); //// throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried"); // } //// END TEST CODE } catch (DriverException | APIException | IOException e) { AbsCassDAO.this.reportPerhapsReset(trans,e); return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql); } finally { tt.done(); } return extract(loader,rs,null /*let Array be created if necessary*/,dflt); } public Result> read(TRANS trans, String text, DATA data) { return read(trans,text, loader.extract(data, size, crud)); } public Object[] keyFrom(DATA data) { return loader.extract(data, size, CRUD.delete); // Delete is key only } /* * Note: in case PSInfos are deleted, we want to remove them from list. This is not expected, * but we don't want a data leak if it does. Finalize doesn't have to happen quickly */ @Override protected void finalize() throws Throwable { psinfos.remove(this); } } protected final Accept dflt = new Accept() { @Override public boolean ok(DATA data) { return true; } }; @SuppressWarnings("unchecked") protected final Result> extract(Loader loader, ResultSet rs, List indata, Accept accept) { List rows = rs.all(); if(rows.isEmpty()) { return Result.ok((List)EMPTY); // Result sets now .emptyList(true); } else { DATA d; List data = indata==null?new ArrayList(rows.size()):indata; for(Row row : rows) { try { d = loader.load(dataClass.newInstance(),row); if(accept.ok(d)) { data.add(d); } } catch(Exception e) { return Result.err(e); } } return Result.ok(data); } } private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created"; private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created"; private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session"; private static class ResetRequest { //package on purpose Session session; long timestamp; public ResetRequest(Session session) { this.session = session; timestamp = System.currentTimeMillis(); } } public static final void primePSIs(TransStore trans) throws APIException, IOException { for(AbsCassDAO.PSInfo psi : psinfos) { if(psi.ps==null) { psi.ps(trans); } } } public final Session getSession(TransStore trans) throws APIException, IOException { // Try to use Trans' session, if exists if(sessionSlot!=null) { // try to get from Trans Session sess = trans.get(sessionSlot, null); if(sess!=null) { return sess; } } // If there's an owning DAO, use it's session if(owningDAO!=null) { return owningDAO.getSession(trans); } // OK, nothing else works... get our own. if(session==null || resetTrigger) { Cluster tempCluster = null; Session tempSession = null; try { synchronized(NEW_CASSANDRA_SESSION_CREATED) { boolean reset = false; for(ResetRequest r : resetDeque) { if(r.session == session) { if(r.timestamp>nextAvailableReset) { reset=true; nextAvailableReset = System.currentTimeMillis() + 60000; tempCluster = cluster; tempSession = session; break; } else { trans.warn().log("Cassandra Connection Reset Ignored: Recent Reset"); } } } if(reset || session == null) { TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB); try { // Note: Maitrayee recommended not closing the cluster, just // overwrite it. 9/30/2016 assuming same for Session // This was a bad idea. Ran out of File Handles as I suspected.. if(reset) { for(AbsCassDAO.PSInfo psi : psinfos) { psi.reset(); } } if(reset || cluster==null) { cluster = CassAccess.cluster(trans, keyspace); trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED); } if(reset || session==null) { session = cluster.connect(keyspace); trans.warn().log(NEW_CASSANDRA_SESSION_CREATED); } } finally { resetTrigger=false; tt.done(); } } } } finally { TimeTaken tt = trans.start("Clear Reset Deque", Env.SUB); try { resetDeque.clear(); // Not clearing Session/Cluster appears to kill off FileHandles if(tempSession!=null && !tempSession.isClosed()) { tempSession.close(); } if(tempCluster!=null && !tempCluster.isClosed()) { tempCluster.close(); } } finally { tt.done(); } } } return session; } public final boolean reportPerhapsReset(TransStore trans, Exception e) { if(owningDAO!=null) { return owningDAO.reportPerhapsReset(trans, e); } else { boolean rv = false; if(CassAccess.isResetException(e)) { trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName()); resetDeque.addFirst(new ResetRequest(session)); rv = resetTrigger = true; } trans.error().log(e); return rv; } } public void close(TransStore trans) { if(owningDAO==null) { if(session!=null) { TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB); try { session.close(); } finally { tt.done(); } session = null; } else { trans.debug().log("close called(), Session already closed"); } } else { owningDAO.close(trans); } } protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) { } protected interface Accept { public boolean ok(DATA data); } }