d0e4eaabf29970aee073fd8ac0f5739dbc911dab
[aaf/authz.git] / auth / auth-cass / src / main / java / org / onap / aaf / auth / dao / AbsCassDAO.java
1 /**
2  * ============LICENSE_START====================================================
3  * org.onap.aaf
4  * ===========================================================================
5  * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
6  * ===========================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END====================================================
19  *
20  */
21
22 package org.onap.aaf.auth.dao;
23
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.Deque;
27 import java.util.List;
28 import java.util.concurrent.ConcurrentLinkedDeque;
29
30 import org.onap.aaf.auth.dao.cass.Status;
31 import org.onap.aaf.auth.layer.Result;
32 import org.onap.aaf.misc.env.APIException;
33 import org.onap.aaf.misc.env.Env;
34 import org.onap.aaf.misc.env.TimeTaken;
35 import org.onap.aaf.misc.env.TransStore;
36
37 import com.datastax.driver.core.BoundStatement;
38 import com.datastax.driver.core.Cluster;
39 import com.datastax.driver.core.ConsistencyLevel;
40 import com.datastax.driver.core.PreparedStatement;
41 import com.datastax.driver.core.ResultSet;
42 import com.datastax.driver.core.ResultSetFuture;
43 import com.datastax.driver.core.Row;
44 import com.datastax.driver.core.Session;
45 import com.datastax.driver.core.exceptions.DriverException;
46
47 public abstract class AbsCassDAO<TRANS extends TransStore,DATA> {
48     protected static final char DOT = '.';
49     protected static final char DOT_PLUS_ONE = '.'+1;
50     protected static final String FIRST_CHAR = Character.toString((char)0);
51     protected static final String LAST_CHAR = Character.toString((char)Character.MAX_VALUE);
52     protected static final int FIELD_COMMAS = 0;
53     protected static final int QUESTION_COMMAS = 1;
54     protected static final int ASSIGNMENT_COMMAS = 2;
55     protected static final int WHERE_ANDS = 3;
56
57     private Cluster cluster; 
58     /*
59      * From DataStax
60      * com.datastax.driver.core.Session
61         A session holds connections to a Cassandra cluster, allowing it to be queried. Each session maintains multiple connections to the cluster nodes, 
62         provides policies to choose which node to use for each query (round-robin on all nodes of the cluster by default), and handles retries for 
63         failed query (when it makes sense), etc...
64         Session instances are thread-safe and usually a single instance is enough per application. However, a given session can only be set to one 
65         keyspace at a time, so one instance per keyspace is necessary.
66      */
67     private Session session;
68     private final String keyspace;
69     // If this is null, then we own session
70     private final AbsCassDAO<TRANS,?> owningDAO;
71     protected Class<DATA> dataClass;
72     private final String name;
73 //    private static Slot sessionSlot; // not used since 2015
74     private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<>();
75     private static final List<Object> EMPTY = new ArrayList<>(0);
76     private static final Deque<ResetRequest> resetDeque = new ConcurrentLinkedDeque<ResetRequest>();
77     private static boolean resetTrigger = false;
78     private static long nextAvailableReset = 0;
79     
80     public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass) {
81         this.name = name;
82         this.cluster = cluster;
83         this.keyspace = keyspace;
84         owningDAO = null;  // we own session
85         session = null;
86         this.dataClass = dataClass;
87     }
88
89     public AbsCassDAO(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass) {
90         this.name = name;
91         cluster = aDao.cluster;
92         keyspace = aDao.keyspace;
93         session = null;
94         // We do not own session
95         owningDAO = aDao;
96         this.dataClass = dataClass;
97     }
98     
99 // Not used since 2015
100 //    public static void setSessionSlot(Slot slot) {
101 //        sessionSlot = slot;
102 //    }
103
104     //Note: Lower case ON PURPOSE. These names used to create History Messages
105     public enum CRUD {
106         create,read,update,delete;
107     }
108
109     public class PSInfo {
110         private PreparedStatement ps;
111         private final int size;
112         private final Loader<DATA> loader;
113         private final CRUD crud; // Store CRUD, because it makes a difference in Object Order, see Loader
114         private final String cql;
115         private final ConsistencyLevel consistency;
116
117
118         /**
119          * Create a PSInfo and create Prepared Statement
120          * 
121          * @param trans
122          * @param theCQL
123          * @param loader
124          */
125         public PSInfo(TRANS trans, String theCQL, Loader<DATA> loader, ConsistencyLevel consistency) {
126             this.loader = loader;
127             this.consistency=consistency;
128             psinfos.add(this);
129
130             cql = theCQL.trim().toUpperCase();
131             if (cql.startsWith("INSERT")) {
132                 crud = CRUD.create;
133             } else if (cql.startsWith("UPDATE")) {
134                 crud = CRUD.update;
135             } else if (cql.startsWith("DELETE")) {
136                 crud = CRUD.delete;
137             } else {
138                 crud = CRUD.read;
139             }
140             
141             int idx = 0, count=0;
142             while ((idx=cql.indexOf('?',idx))>=0) {
143                 ++idx;
144                 ++count;
145             }
146             size=count;
147         }
148         
149         public synchronized void reset() {
150             ps = null;
151         }
152         
153         private synchronized BoundStatement ps(TransStore trans) throws APIException, IOException {
154             /* From Datastax
155                 You should prepare only once, and cache the PreparedStatement in your application (it is thread-safe). 
156                 If you call prepare multiple times with the same query string, the driver will log a warning.
157             */
158             if (ps==null) {
159                 TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB);
160                 try {
161                     ps = getSession(trans).prepare(cql);
162                     ps.setConsistencyLevel(consistency);
163                 } catch (DriverException e) {
164                     reportPerhapsReset(trans,e);
165                     throw e;
166                 } finally {
167                     tt.done();
168                 }
169             }
170             // BoundStatements are NOT threadsafe... need a new one each time.
171             return new BoundStatement(ps);
172         }
173
174         /**
175          * Execute a Prepared Statement by extracting from DATA object
176          * 
177          * @param trans
178          * @param text
179          * @param data
180          * @return
181          */
182         public Result<ResultSetFuture> execAsync(TRANS trans, String text, DATA data) {
183             TimeTaken tt = trans.start(text, Env.REMOTE);
184             try {
185                 return Result.ok(getSession(trans).executeAsync(
186                         ps(trans).bind(loader.extract(data, size, crud))));
187             } catch (DriverException | APIException | IOException e) {
188                 AbsCassDAO.this.reportPerhapsReset(trans,e);
189                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
190             } finally {
191                 tt.done();
192             }
193         }
194
195         /**
196          * Execute a Prepared Statement on Object[] key
197          * 
198          * @param trans
199          * @param text
200          * @param objs
201          * @return
202          */
203         public Result<ResultSetFuture> execAsync(TRANS trans, String text, Object ... objs) {
204             TimeTaken tt = trans.start(text, Env.REMOTE);
205             try {
206                 return Result.ok(getSession(trans).executeAsync(ps(trans).bind(objs)));
207             } catch (DriverException | APIException | IOException e) {
208                 AbsCassDAO.this.reportPerhapsReset(trans,e);
209                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
210             } finally {
211                 tt.done();
212             }
213         }
214         
215         /* 
216          * Note:
217          * 
218          */
219
220         /**
221          * Execute a Prepared Statement by extracting from DATA object
222          * 
223          * @param trans
224          * @param text
225          * @param data
226          * @return
227          */
228         public Result<ResultSet> exec(TRANS trans, String text, DATA data) {
229             TimeTaken tt = trans.start(text, Env.REMOTE);
230             try {
231                 /*
232                  * "execute" (and executeAsync)
233                  * Executes the provided query.
234                     This method blocks until at least some result has been received from the database. However, 
235                     for SELECT queries, it does not guarantee that the result has been received in full. But it 
236                     does guarantee that some response has been received from the database, and in particular 
237                     guarantee that if the request is invalid, an exception will be thrown by this method.
238
239                     Parameters:
240                     statement - the CQL query to execute (that can be any Statement).
241                     Returns:
242                         the result of the query. That result will never be null but can be empty (and will 
243                         be for any non SELECT query).
244                  */
245                 return Result.ok(getSession(trans).execute(
246                         ps(trans).bind(loader.extract(data, size, crud))));
247             } catch (DriverException | APIException | IOException e) {
248                 AbsCassDAO.this.reportPerhapsReset(trans,e);
249                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
250             } finally {
251                 tt.done();
252             }
253         }
254
255         /**
256          * Execute a Prepared Statement on Object[] key
257          * 
258          * @param trans
259          * @param text
260          * @param objs
261          * @return
262          */
263         public Result<ResultSet> exec(TRANS trans, String text, Object ... objs) {
264             TimeTaken tt = trans.start(text, Env.REMOTE);
265             try {
266                 return Result.ok(getSession(trans).execute(ps(trans).bind(objs)));
267             } catch (DriverException | APIException | IOException e) {
268                 AbsCassDAO.this.reportPerhapsReset(trans,e);
269                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
270             } finally {
271                 tt.done();
272             }
273         }
274
275         /**
276          * Read the Data from Cassandra given a Prepared Statement (defined by the
277          * DAO Instance)
278          *
279          * This is common behavior among all DAOs.
280          * @throws DAOException
281          */
282         public Result<List<DATA>> read(TRANS trans, String text, Object[] key) {
283             TimeTaken tt = trans.start(text,Env.REMOTE);
284             
285             ResultSet rs;
286             try {
287                 rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key));
288 /// TEST CODE for Exception                
289 //                boolean force = true; 
290 //                if (force) {
291 //                    Map<InetSocketAddress, Throwable> misa = new HashMap<>();
292 //                    //misa.put(new InetSocketAddress(444),new Exception("no host was tried"));
293 //                    misa.put(new InetSocketAddress(444),new Exception("Connection has been closed"));
294 //                    throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa);
295 ////                    throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried");
296 //                }
297 //// END TEST CODE
298             } catch (DriverException | APIException | IOException e) {
299                 AbsCassDAO.this.reportPerhapsReset(trans,e);
300                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
301             } finally {
302                 tt.done();
303             }
304             
305             return extract(loader,rs,null /*let Array be created if necessary*/,dflt);
306         }
307         
308         public Result<List<DATA>> read(TRANS trans, String text, DATA data) {
309             return read(trans,text, loader.extract(data, size, crud));
310         }
311         
312         public Object[] keyFrom(DATA data) {
313             return loader.extract(data, size, CRUD.delete); // Delete is key only
314         }
315
316         /*
317          * Note: in case PSInfos are deleted, we want to remove them from list.  This is not expected, 
318          * but we don't want a data leak if it does.  Finalize doesn't have to happen quickly
319          */
320         @Override
321         protected void finalize() throws Throwable {
322             psinfos.remove(this);
323         }
324     }
325
326     protected final Accept<DATA> dflt = new Accept<DATA>() {
327         @Override
328         public boolean ok(DATA data) {
329             return true;
330         }
331     };
332
333
334     @SuppressWarnings("unchecked")
335     protected final Result<List<DATA>> extract(Loader<DATA> loader, ResultSet rs, List<DATA> indata, Accept<DATA> accept) {
336         List<Row> rows = rs.all();
337         if (rows.isEmpty()) {
338             return Result.ok((List<DATA>)EMPTY); // Result sets now .emptyList(true);
339         } else {
340             DATA d;
341             List<DATA> data = indata==null?new ArrayList<>(rows.size()):indata;
342             
343             for (Row row : rows) {
344                 try {
345                     d = loader.load(dataClass.newInstance(),row);
346                     if (accept.ok(d)) {
347                         data.add(d);
348                     }
349                 } catch (Exception e) {
350                     return Result.err(e);
351                 }
352             }
353             return Result.ok(data);
354         }
355     }
356     
357     private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created";
358     private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created";
359     private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session";
360     private static final Object LOCK = new Object();
361
362     private static class ResetRequest {
363         //package on purpose
364         Session session;
365         long timestamp;
366         
367         public ResetRequest(Session session) {
368             this.session = session;
369             timestamp = System.currentTimeMillis();
370         }
371     }
372
373     
374     public static final void primePSIs(TransStore trans) throws APIException, IOException {
375         for (AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
376             if (psi.ps==null) {
377                 psi.ps(trans);
378             }
379         }
380     }
381     
382     public final Session getSession(TransStore trans) throws APIException, IOException {
383         // SessionFilter unused since 2015
384         // Try to use Trans' session, if exists
385 //        if (sessionSlot!=null) { // try to get from Trans
386 //            Session sess = trans.get(sessionSlot, null);
387 //            if (sess!=null) {
388 //                return sess;
389 //            }
390 //        }
391         
392         // If there's an owning DAO, use it's session
393         if (owningDAO!=null) { 
394             return owningDAO.getSession(trans);
395         }
396         
397         // OK, nothing else works... get our own.
398         if (session==null || resetTrigger) {
399             Cluster tempCluster = null;
400             Session tempSession = null;
401             try {
402                 synchronized(LOCK) {
403                     boolean reset = false;
404                     for (ResetRequest r : resetDeque) {
405                         if (r.session == session) {
406                             if (r.timestamp>nextAvailableReset) {
407                                 reset=true;
408                                 nextAvailableReset = System.currentTimeMillis() + 60000;
409                                 tempCluster = cluster;
410                                 tempSession = session;
411                                 break;
412                             } else {
413                                 trans.warn().log("Cassandra Connection Reset Ignored: Recent Reset");
414                             }
415                         }
416                     }
417     
418                     if (reset || session == null) {
419                         TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB);
420                         try {
421                             // Note: Maitrayee recommended not closing the cluster, just
422                             // overwrite it. Jonathan 9/30/2016 assuming same for Session
423                             // This was a bad idea.  Ran out of File Handles as I suspected, Jonathan
424                             if (reset) {
425                                 for (AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
426                                     psi.reset();
427                                 }
428                             }
429                             if (reset || cluster==null) {
430                                 cluster = CassAccess.cluster(trans, keyspace);
431                                 trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED);
432                             }
433                             if (reset || session==null) {
434                                 session = cluster.connect(keyspace);
435                                 trans.warn().log(NEW_CASSANDRA_SESSION_CREATED);
436                             }
437                         } finally {
438                             resetTrigger=false;
439                             tt.done();
440                         }
441                     }
442                 }
443             } finally {
444                 TimeTaken tt = trans.start("Clear Reset Deque", Env.SUB);
445                 try {
446                     resetDeque.clear();
447                     // Not clearing Session/Cluster appears to kill off FileHandles
448                     if (tempSession!=null && !tempSession.isClosed()) {
449                         tempSession.close();
450                     }
451                     if (tempCluster!=null && !tempCluster.isClosed()) {
452                         tempCluster.close();
453                     }
454                 } finally {
455                     tt.done();
456                 }
457             }
458         }
459         return session;
460     }
461     
462     public final boolean reportPerhapsReset(TransStore trans, Exception e) {
463         if (owningDAO!=null) {
464             return owningDAO.reportPerhapsReset(trans, e);
465         } else {
466             boolean rv = false;
467             if (CassAccess.isResetException(e)) {
468                 trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName());
469                 resetDeque.addFirst(new ResetRequest(session));
470                 rv = resetTrigger = true;
471             } 
472             trans.error().log(e);
473             return rv;
474         }
475     }
476
477     public void close(TransStore trans) {
478         if (owningDAO==null) {
479             if (session!=null) {
480                 TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB);
481                 try {
482                     session.close();
483                 } finally {
484                     tt.done();
485                 }
486                 session = null;
487             } else {
488                 trans.debug().log("close called(), Session already closed");
489             }
490         } else {
491             owningDAO.close(trans);
492         }
493     }
494
495     protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) {
496     }
497     
498     protected interface Accept<DATA> {
499         public boolean ok(DATA data);
500     }
501
502 }
503
504
505