AbsCassDAO.java -sonar fix
[aaf/authz.git] / auth / auth-cass / src / main / java / org / onap / aaf / auth / dao / AbsCassDAO.java
1 /**
2  * ============LICENSE_START====================================================
3  * org.onap.aaf
4  * ===========================================================================
5  * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
6  * ===========================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END====================================================
19  *
20  */
21
22 package org.onap.aaf.auth.dao;
23
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.Deque;
27 import java.util.List;
28 import java.util.concurrent.ConcurrentLinkedDeque;
29
30 import org.onap.aaf.auth.dao.cass.Status;
31 import org.onap.aaf.auth.layer.Result;
32 import org.onap.aaf.misc.env.APIException;
33 import org.onap.aaf.misc.env.Env;
34 import org.onap.aaf.misc.env.TimeTaken;
35 import org.onap.aaf.misc.env.TransStore;
36
37 import com.datastax.driver.core.BoundStatement;
38 import com.datastax.driver.core.Cluster;
39 import com.datastax.driver.core.ConsistencyLevel;
40 import com.datastax.driver.core.PreparedStatement;
41 import com.datastax.driver.core.ResultSet;
42 import com.datastax.driver.core.ResultSetFuture;
43 import com.datastax.driver.core.Row;
44 import com.datastax.driver.core.Session;
45 import com.datastax.driver.core.exceptions.DriverException;
46
47 public abstract class AbsCassDAO<TRANS extends TransStore,DATA> {
48     protected static final char DOT = '.';
49     protected static final char DOT_PLUS_ONE = '.'+1;
50     protected static final String FIRST_CHAR = Character.toString((char)0);
51     protected static final String LAST_CHAR = Character.toString((char)Character.MAX_VALUE);
52     protected static final int FIELD_COMMAS = 0;
53     protected static final int QUESTION_COMMAS = 1;
54     protected static final int ASSIGNMENT_COMMAS = 2;
55     protected static final int WHERE_ANDS = 3;
56
57     private Cluster cluster;
58     /*
59      * From DataStax
60      * com.datastax.driver.core.Session
61         A session holds connections to a Cassandra cluster, allowing it to be queried. Each session maintains multiple connections to the cluster nodes,
62         provides policies to choose which node to use for each query (round-robin on all nodes of the cluster by default), and handles retries for
63         failed query (when it makes sense), etc...
64         Session instances are thread-safe and usually a single instance is enough per application. However, a given session can only be set to one
65         keyspace at a time, so one instance per keyspace is necessary.
66      */
67     private Session session;
68     private final String keyspace;
69     // If this is null, then we own session
70     private final AbsCassDAO<TRANS,?> owningDAO;
71     protected Class<DATA> dataClass;
72     private final String name;
73 //    private static Slot sessionSlot; // not used since 2015
74     private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<>();
75     private static final List<Object> EMPTY = new ArrayList<>(0);
76     private static final Deque<ResetRequest> resetDeque = new ConcurrentLinkedDeque<>();
77     private static boolean resetTrigger = false;
78     private static long nextAvailableReset = 0;
79
80     public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass) {
81         this.name = name;
82         this.cluster = cluster;
83         this.keyspace = keyspace;
84         owningDAO = null;  // we own session
85         session = null;
86         this.dataClass = dataClass;
87     }
88
89     public AbsCassDAO(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass) {
90         this.name = name;
91         cluster = aDao.cluster;
92         keyspace = aDao.keyspace;
93         session = null;
94         // We do not own session
95         owningDAO = aDao;
96         this.dataClass = dataClass;
97     }
98
99
100     //Note: Lower case ON PURPOSE. These names used to create History Messages
101     public enum CRUD {
102         create,read,update,delete;
103     }
104
105     public class PSInfo {
106         private PreparedStatement ps;
107         private final int size;
108         private final Loader<DATA> loader;
109         private final CRUD crud; // Store CRUD, because it makes a difference in Object Order, see Loader
110         private final String cql;
111         private final ConsistencyLevel consistency;
112
113
114         /**
115          * Create a PSInfo and create Prepared Statement
116          *
117          * @param trans
118          * @param theCQL
119          * @param loader
120          */
121         public PSInfo(TRANS trans, String theCQL, Loader<DATA> loader, ConsistencyLevel consistency) {
122             this.loader = loader;
123             this.consistency=consistency;
124             psinfos.add(this);
125
126             cql = theCQL.trim().toUpperCase();
127             if (cql.startsWith("INSERT")) {
128                 crud = CRUD.create;
129             } else if (cql.startsWith("UPDATE")) {
130                 crud = CRUD.update;
131             } else if (cql.startsWith("DELETE")) {
132                 crud = CRUD.delete;
133             } else {
134                 crud = CRUD.read;
135             }
136
137             int idx = 0, count=0;
138             while ((idx=cql.indexOf('?',idx))>=0) {
139                 ++idx;
140                 ++count;
141             }
142             size=count;
143         }
144
145         public synchronized void reset() {
146             ps = null;
147         }
148
149         private synchronized BoundStatement ps(TransStore trans) throws APIException, IOException {
150             /* From Datastax
151                 You should prepare only once, and cache the PreparedStatement in your application (it is thread-safe).
152                 If you call prepare multiple times with the same query string, the driver will log a warning.
153             */
154             if (ps==null) {
155                 TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB);
156                 try {
157                     ps = getSession(trans).prepare(cql);
158                     ps.setConsistencyLevel(consistency);
159                 } catch (DriverException e) {
160                     reportPerhapsReset(trans,e);
161                     throw e;
162                 } finally {
163                     tt.done();
164                 }
165             }
166             // BoundStatements are NOT threadsafe... need a new one each time.
167             return new BoundStatement(ps);
168         }
169
170         /**
171          * Execute a Prepared Statement by extracting from DATA object
172          *
173          * @param trans
174          * @param text
175          * @param data
176          * @return
177          */
178         public Result<ResultSetFuture> execAsync(TRANS trans, String text, DATA data) {
179             TimeTaken tt = trans.start(text, Env.REMOTE);
180             try {
181                 return Result.ok(getSession(trans).executeAsync(
182                         ps(trans).bind(loader.extract(data, size, crud))));
183             } catch (DriverException | APIException | IOException e) {
184                 AbsCassDAO.this.reportPerhapsReset(trans,e);
185                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
186             } finally {
187                 tt.done();
188             }
189         }
190
191         /**
192          * Execute a Prepared Statement on Object[] key
193          *
194          * @param trans
195          * @param text
196          * @param objs
197          * @return
198          */
199         public Result<ResultSetFuture> execAsync(TRANS trans, String text, Object ... objs) {
200             TimeTaken tt = trans.start(text, Env.REMOTE);
201             try {
202                 return Result.ok(getSession(trans).executeAsync(ps(trans).bind(objs)));
203             } catch (DriverException | APIException | IOException e) {
204                 AbsCassDAO.this.reportPerhapsReset(trans,e);
205                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
206             } finally {
207                 tt.done();
208             }
209         }
210
211         /*
212          * Note:
213          *
214          */
215
216         /**
217          * Execute a Prepared Statement by extracting from DATA object
218          *
219          * @param trans
220          * @param text
221          * @param data
222          * @return
223          */
224         public Result<ResultSet> exec(TRANS trans, String text, DATA data) {
225             TimeTaken tt = trans.start(text, Env.REMOTE);
226             try {
227                 /*
228                  * "execute" (and executeAsync)
229                  * Executes the provided query.
230                     This method blocks until at least some result has been received from the database. However,
231                     for SELECT queries, it does not guarantee that the result has been received in full. But it
232                     does guarantee that some response has been received from the database, and in particular
233                     guarantee that if the request is invalid, an exception will be thrown by this method.
234
235                     Parameters:
236                     statement - the CQL query to execute (that can be any Statement).
237                     Returns:
238                         the result of the query. That result will never be null but can be empty (and will
239                         be for any non SELECT query).
240                  */
241                 return Result.ok(getSession(trans).execute(
242                         ps(trans).bind(loader.extract(data, size, crud))));
243             } catch (DriverException | APIException | IOException e) {
244                 AbsCassDAO.this.reportPerhapsReset(trans,e);
245                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
246             } finally {
247                 tt.done();
248             }
249         }
250
251         /**
252          * Execute a Prepared Statement on Object[] key
253          *
254          * @param trans
255          * @param text
256          * @param objs
257          * @return
258          */
259         public Result<ResultSet> exec(TRANS trans, String text, Object ... objs) {
260             TimeTaken tt = trans.start(text, Env.REMOTE);
261             try {
262                 return Result.ok(getSession(trans).execute(ps(trans).bind(objs)));
263             } catch (DriverException | APIException | IOException e) {
264                 AbsCassDAO.this.reportPerhapsReset(trans,e);
265                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
266             } finally {
267                 tt.done();
268             }
269         }
270
271         /**
272          * Read the Data from Cassandra given a Prepared Statement (defined by the
273          * DAO Instance)
274          *
275          * This is common behavior among all DAOs.
276          * @throws DAOException
277          */
278         public Result<List<DATA>> read(TRANS trans, String text, Object[] key) {
279             TimeTaken tt = trans.start(text,Env.REMOTE);
280
281             ResultSet rs;
282             try {
283                 rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key));
284 /// TEST CODE for Exception
285 //                boolean force = true;
286 //                if (force) {
287 //                    Map<InetSocketAddress, Throwable> misa = new HashMap<>();
288 //                    //misa.put(new InetSocketAddress(444),new Exception("no host was tried"));
289 //                    misa.put(new InetSocketAddress(444),new Exception("Connection has been closed"));
290 //                    throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa);
291 ////                    throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried");
292 //                }
293 //// END TEST CODE
294             } catch (DriverException | APIException | IOException e) {
295                 AbsCassDAO.this.reportPerhapsReset(trans,e);
296                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
297             } finally {
298                 tt.done();
299             }
300
301             return extract(loader,rs,null /*let Array be created if necessary*/,dflt);
302         }
303
304         public Result<List<DATA>> read(TRANS trans, String text, DATA data) {
305             return read(trans,text, loader.extract(data, size, crud));
306         }
307
308         public Object[] keyFrom(DATA data) {
309             return loader.extract(data, size, CRUD.delete); // Delete is key only
310         }
311
312         /*
313          * Note: in case PSInfos are deleted, we want to remove them from list.  This is not expected,
314          * but we don't want a data leak if it does.  Finalize doesn't have to happen quickly
315          */
316         @Override
317         protected void finalize() throws Throwable {
318             psinfos.remove(this);
319         }
320     }
321
322     protected final Accept<DATA> dflt = new Accept<DATA>() {
323         @Override
324         public boolean ok(DATA data) {
325             return true;
326         }
327     };
328
329
330     @SuppressWarnings("unchecked")
331     protected final Result<List<DATA>> extract(Loader<DATA> loader, ResultSet rs, List<DATA> indata, Accept<DATA> accept) {
332         List<Row> rows = rs.all();
333         if (rows.isEmpty()) {
334             return Result.ok((List<DATA>)EMPTY); // Result sets now .emptyList(true);
335         } else {
336             DATA d;
337             List<DATA> data = indata==null?new ArrayList<>(rows.size()):indata;
338
339             for (Row row : rows) {
340                 try {
341                     d = loader.load(dataClass.newInstance(),row);
342                     if (accept.ok(d)) {
343                         data.add(d);
344                     }
345                 } catch (Exception e) {
346                     return Result.err(e);
347                 }
348             }
349             return Result.ok(data);
350         }
351     }
352
353     private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created";
354     private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created";
355     private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session";
356     private static final Object LOCK = new Object();
357
358     private static class ResetRequest {
359         //package on purpose
360         Session session;
361         long timestamp;
362
363         public ResetRequest(Session session) {
364             this.session = session;
365             timestamp = System.currentTimeMillis();
366         }
367     }
368
369
370     public static final void primePSIs(TransStore trans) throws APIException, IOException {
371         for (AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
372             if (psi.ps==null) {
373                 psi.ps(trans);
374             }
375         }
376     }
377
378     public final Session getSession(TransStore trans) throws APIException, IOException {
379         // SessionFilter unused since 2015
380         // Try to use Trans' session, if exists
381 //        if (sessionSlot!=null) { // try to get from Trans
382 //            Session sess = trans.get(sessionSlot, null);
383 //            if (sess!=null) {
384 //                return sess;
385 //            }
386 //        }
387
388         // If there's an owning DAO, use it's session
389         if (owningDAO!=null) {
390             return owningDAO.getSession(trans);
391         }
392
393         // OK, nothing else works... get our own.
394         if (session==null || resetTrigger) {
395             Cluster tempCluster = null;
396             Session tempSession = null;
397             try {
398                 synchronized(LOCK) {
399                     boolean reset = false;
400                     for (ResetRequest r : resetDeque) {
401                         if (r.session == session) {
402                             if (r.timestamp>nextAvailableReset) {
403                                 reset=true;
404                                 nextAvailableReset = System.currentTimeMillis() + 60000;
405                                 tempCluster = cluster;
406                                 tempSession = session;
407                                 break;
408                             } else {
409                                 trans.warn().log("Cassandra Connection Reset Ignored: Recent Reset");
410                             }
411                         }
412                     }
413
414                     if (reset || session == null) {
415                         TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB);
416                         try {
417                             // Note: Maitrayee recommended not closing the cluster, just
418                             // overwrite it. Jonathan 9/30/2016 assuming same for Session
419                             // This was a bad idea.  Ran out of File Handles as I suspected, Jonathan
420                             if (reset) {
421                                 for (AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
422                                     psi.reset();
423                                 }
424                             }
425                             if (reset || cluster==null) {
426                                 cluster = CassAccess.cluster(trans, keyspace);
427                                 trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED);
428                             }
429                             if (reset || session==null) {
430                                 session = cluster.connect(keyspace);
431                                 trans.warn().log(NEW_CASSANDRA_SESSION_CREATED);
432                             }
433                         } finally {
434                             resetTrigger=false;
435                             tt.done();
436                         }
437                     }
438                 }
439             } finally {
440                 TimeTaken tt = trans.start("Clear Reset Deque", Env.SUB);
441                 try {
442                     resetDeque.clear();
443                     // Not clearing Session/Cluster appears to kill off FileHandles
444                     if (tempSession!=null && !tempSession.isClosed()) {
445                         tempSession.close();
446                     }
447                     if (tempCluster!=null && !tempCluster.isClosed()) {
448                         tempCluster.close();
449                     }
450                 } finally {
451                     tt.done();
452                 }
453             }
454         }
455         return session;
456     }
457
458     public final boolean reportPerhapsReset(TransStore trans, Exception e) {
459         if (owningDAO!=null) {
460             return owningDAO.reportPerhapsReset(trans, e);
461         } else {
462             boolean rv = false;
463             if (CassAccess.isResetException(e)) {
464                 trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName());
465                 resetDeque.addFirst(new ResetRequest(session));
466                 rv = resetTrigger = true;
467             }
468             trans.error().log(e);
469             return rv;
470         }
471     }
472
473     public void close(TransStore trans) {
474         if (owningDAO==null) {
475             if (session!=null) {
476                 TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB);
477                 try {
478                     session.close();
479                 } finally {
480                     tt.done();
481                 }
482                 session = null;
483             } else {
484                 trans.debug().log("close called(), Session already closed");
485             }
486         } else {
487             owningDAO.close(trans);
488         }
489     }
490
491     protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) {
492     }
493
494     protected interface Accept<DATA> {
495         public boolean ok(DATA data);
496     }
497
498 }
499
500
501