Collection syntax change because of Sonar
[aaf/authz.git] / auth / auth-cass / src / main / java / org / onap / aaf / auth / dao / AbsCassDAO.java
1 /**
2  * ============LICENSE_START====================================================
3  * org.onap.aaf
4  * ===========================================================================
5  * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
6  * ===========================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END====================================================
19  *
20  */
21
22 package org.onap.aaf.auth.dao;
23
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.Deque;
27 import java.util.List;
28 import java.util.concurrent.ConcurrentLinkedDeque;
29
30 import org.onap.aaf.auth.dao.cass.Status;
31 import org.onap.aaf.auth.layer.Result;
32 import org.onap.aaf.misc.env.APIException;
33 import org.onap.aaf.misc.env.Env;
34 import org.onap.aaf.misc.env.TimeTaken;
35 import org.onap.aaf.misc.env.TransStore;
36
37 import com.datastax.driver.core.BoundStatement;
38 import com.datastax.driver.core.Cluster;
39 import com.datastax.driver.core.ConsistencyLevel;
40 import com.datastax.driver.core.PreparedStatement;
41 import com.datastax.driver.core.ResultSet;
42 import com.datastax.driver.core.ResultSetFuture;
43 import com.datastax.driver.core.Row;
44 import com.datastax.driver.core.Session;
45 import com.datastax.driver.core.exceptions.DriverException;
46
47 public abstract class AbsCassDAO<TRANS extends TransStore,DATA> {
48         protected static final char DOT = '.';
49         protected static final char DOT_PLUS_ONE = '.'+1;
50         protected static final String FIRST_CHAR = Character.toString((char)0);
51         protected static final String LAST_CHAR = Character.toString((char)Character.MAX_VALUE);
52         protected static final int FIELD_COMMAS = 0;
53         protected static final int QUESTION_COMMAS = 1;
54         protected static final int ASSIGNMENT_COMMAS = 2;
55         protected static final int WHERE_ANDS = 3;
56
57         private Cluster cluster; 
58         /*
59          * From DataStax
60          * com.datastax.driver.core.Session
61                 A session holds connections to a Cassandra cluster, allowing it to be queried. Each session maintains multiple connections to the cluster nodes, 
62                 provides policies to choose which node to use for each query (round-robin on all nodes of the cluster by default), and handles retries for 
63                 failed query (when it makes sense), etc...
64                 Session instances are thread-safe and usually a single instance is enough per application. However, a given session can only be set to one 
65                 keyspace at a time, so one instance per keyspace is necessary.
66          */
67         private Session session;
68         private final String keyspace;
69         // If this is null, then we own session
70         private final AbsCassDAO<TRANS,?> owningDAO;
71         protected Class<DATA> dataClass;
72         private final String name;
73 //      private static Slot sessionSlot; // not used since 2015
74         private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<>();
75         private static final List<Object> EMPTY = new ArrayList<>(0);
76         private static final Deque<ResetRequest> resetDeque = new ConcurrentLinkedDeque<ResetRequest>();
77         private static boolean resetTrigger = false;
78         private static long nextAvailableReset = 0;
79         
80         public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass) {
81                 this.name = name;
82                 this.cluster = cluster;
83                 this.keyspace = keyspace;
84                 owningDAO = null;  // we own session
85                 session = null;
86                 this.dataClass = dataClass;
87         }
88
89         public AbsCassDAO(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass) {
90                 this.name = name;
91                 cluster = aDao.cluster;
92                 keyspace = aDao.keyspace;
93                 session = null;
94                 // We do not own session
95                 owningDAO = aDao;
96                 this.dataClass = dataClass;
97         }
98         
99 // Not used since 2015
100 //      public static void setSessionSlot(Slot slot) {
101 //              sessionSlot = slot;
102 //      }
103
104         //Note: Lower case ON PURPOSE. These names used to create History Messages
105         public enum CRUD {
106                 create,read,update,delete;
107         }
108
109         public class PSInfo {
110                 private PreparedStatement ps;
111                 private final int size;
112                 private final Loader<DATA> loader;
113                 private final CRUD crud; // Store CRUD, because it makes a difference in Object Order, see Loader
114                 private final String cql;
115                 private final ConsistencyLevel consistency;
116
117
118                 /**
119                  * Create a PSInfo and create Prepared Statement
120                  * 
121                  * @param trans
122                  * @param theCQL
123                  * @param loader
124                  */
125                 public PSInfo(TRANS trans, String theCQL, Loader<DATA> loader, ConsistencyLevel consistency) {
126                         this.loader = loader;
127                         this.consistency=consistency;
128                         psinfos.add(this);
129
130                         cql = theCQL.trim().toUpperCase();
131                         if(cql.startsWith("INSERT")) {
132                                 crud = CRUD.create;
133                         } else if(cql.startsWith("UPDATE")) {
134                                 crud = CRUD.update;
135                         } else if(cql.startsWith("DELETE")) {
136                                 crud = CRUD.delete;
137                         } else {
138                                 crud = CRUD.read;
139                         }
140                         
141                         int idx = 0, count=0;
142                         while((idx=cql.indexOf('?',idx))>=0) {
143                                 ++idx;
144                                 ++count;
145                         }
146                         size=count;
147                 }
148                 
149                 public synchronized void reset() {
150                         ps = null;
151                 }
152                 
153                 private synchronized BoundStatement ps(TransStore trans) throws APIException, IOException {
154                         /* From Datastax
155                                 You should prepare only once, and cache the PreparedStatement in your application (it is thread-safe). 
156                                 If you call prepare multiple times with the same query string, the driver will log a warning.
157                         */
158                         if(ps==null) {
159                                 TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB);
160                                 try {
161                                         ps = getSession(trans).prepare(cql);
162                                         ps.setConsistencyLevel(consistency);
163                                 } catch (DriverException e) {
164                                         reportPerhapsReset(trans,e);
165                                         throw e;
166                                 } finally {
167                                         tt.done();
168                                 }
169                         }
170                         // BoundStatements are NOT threadsafe... need a new one each time.
171                         return new BoundStatement(ps);
172                 }
173
174                 /**
175                  * Execute a Prepared Statement by extracting from DATA object
176                  * 
177                  * @param trans
178                  * @param text
179                  * @param data
180                  * @return
181                  */
182                 public Result<ResultSetFuture> execAsync(TRANS trans, String text, DATA data) {
183                         TimeTaken tt = trans.start(text, Env.REMOTE);
184                         try {
185                                 return Result.ok(getSession(trans).executeAsync(
186                                                 ps(trans).bind(loader.extract(data, size, crud))));
187                         } catch (DriverException | APIException | IOException e) {
188                                 AbsCassDAO.this.reportPerhapsReset(trans,e);
189                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
190                         } finally {
191                                 tt.done();
192                         }
193                 }
194
195                 /**
196                  * Execute a Prepared Statement on Object[] key
197                  * 
198                  * @param trans
199                  * @param text
200                  * @param objs
201                  * @return
202                  */
203                 public Result<ResultSetFuture> execAsync(TRANS trans, String text, Object ... objs) {
204                         TimeTaken tt = trans.start(text, Env.REMOTE);
205                         try {
206                                 return Result.ok(getSession(trans).executeAsync(ps(trans).bind(objs)));
207                         } catch (DriverException | APIException | IOException e) {
208                                 AbsCassDAO.this.reportPerhapsReset(trans,e);
209                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
210                         } finally {
211                                 tt.done();
212                         }
213                 }
214                 
215                 /* 
216                  * Note:
217                  * 
218                  */
219
220                 /**
221                  * Execute a Prepared Statement by extracting from DATA object
222                  * 
223                  * @param trans
224                  * @param text
225                  * @param data
226                  * @return
227                  */
228                 public Result<ResultSet> exec(TRANS trans, String text, DATA data) {
229                         TimeTaken tt = trans.start(text, Env.REMOTE);
230                         try {
231                                 /*
232                                  * "execute" (and executeAsync)
233                                  * Executes the provided query.
234                                         This method blocks until at least some result has been received from the database. However, 
235                                         for SELECT queries, it does not guarantee that the result has been received in full. But it 
236                                         does guarantee that some response has been received from the database, and in particular 
237                                         guarantee that if the request is invalid, an exception will be thrown by this method.
238
239                                         Parameters:
240                                         statement - the CQL query to execute (that can be any Statement).
241                                         Returns:
242                                                 the result of the query. That result will never be null but can be empty (and will 
243                                                 be for any non SELECT query).
244                                  */
245                                 return Result.ok(getSession(trans).execute(
246                                                 ps(trans).bind(loader.extract(data, size, crud))));
247                         } catch (DriverException | APIException | IOException e) {
248                                 AbsCassDAO.this.reportPerhapsReset(trans,e);
249                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
250                         } finally {
251                                 tt.done();
252                         }
253                 }
254
255                 /**
256                  * Execute a Prepared Statement on Object[] key
257                  * 
258                  * @param trans
259                  * @param text
260                  * @param objs
261                  * @return
262                  */
263                 public Result<ResultSet> exec(TRANS trans, String text, Object ... objs) {
264                         TimeTaken tt = trans.start(text, Env.REMOTE);
265                         try {
266                                 return Result.ok(getSession(trans).execute(ps(trans).bind(objs)));
267                         } catch (DriverException | APIException | IOException e) {
268                                 AbsCassDAO.this.reportPerhapsReset(trans,e);
269                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
270                         } finally {
271                                 tt.done();
272                         }
273                 }
274
275                 /**
276                  * Read the Data from Cassandra given a Prepared Statement (defined by the
277                  * DAO Instance)
278                  *
279                  * This is common behavior among all DAOs.
280                  * @throws DAOException
281                  */
282                 public Result<List<DATA>> read(TRANS trans, String text, Object[] key) {
283                         TimeTaken tt = trans.start(text,Env.REMOTE);
284                         
285                         ResultSet rs;
286                         try {
287                                 rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key));
288 /// TEST CODE for Exception                             
289 //                              boolean force = true; 
290 //                              if(force) {
291 //                                      Map<InetSocketAddress, Throwable> misa = new HashMap<>();
292 //                                      //misa.put(new InetSocketAddress(444),new Exception("no host was tried"));
293 //                                      misa.put(new InetSocketAddress(444),new Exception("Connection has been closed"));
294 //                                      throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa);
295 ////                                    throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried");
296 //                              }
297 //// END TEST CODE
298                         } catch (DriverException | APIException | IOException e) {
299                                 AbsCassDAO.this.reportPerhapsReset(trans,e);
300                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
301                         } finally {
302                                 tt.done();
303                         }
304                         
305                         return extract(loader,rs,null /*let Array be created if necessary*/,dflt);
306                 }
307                 
308                 public Result<List<DATA>> read(TRANS trans, String text, DATA data) {
309                         return read(trans,text, loader.extract(data, size, crud));
310                 }
311                 
312                 public Object[] keyFrom(DATA data) {
313                         return loader.extract(data, size, CRUD.delete); // Delete is key only
314                 }
315
316                 /*
317                  * Note: in case PSInfos are deleted, we want to remove them from list.  This is not expected, 
318                  * but we don't want a data leak if it does.  Finalize doesn't have to happen quickly
319                  */
320                 @Override
321                 protected void finalize() throws Throwable {
322                         psinfos.remove(this);
323                 }
324         }
325
326         protected final Accept<DATA> dflt = new Accept<DATA>() {
327                 @Override
328                 public boolean ok(DATA data) {
329                         return true;
330                 }
331         };
332
333
334         @SuppressWarnings("unchecked")
335     protected final Result<List<DATA>> extract(Loader<DATA> loader, ResultSet rs, List<DATA> indata, Accept<DATA> accept) {
336                 List<Row> rows = rs.all();
337                 if(rows.isEmpty()) {
338                         return Result.ok((List<DATA>)EMPTY); // Result sets now .emptyList(true);
339                 } else {
340                         DATA d;
341                         List<DATA> data = indata==null?new ArrayList<>(rows.size()):indata;
342                         
343                         for(Row row : rows) {
344                                 try {
345                                         d = loader.load(dataClass.newInstance(),row);
346                                         if(accept.ok(d)) {
347                                                 data.add(d);
348                                         }
349                                 } catch(Exception e) {
350                                         return Result.err(e);
351                                 }
352                         }
353                         return Result.ok(data);
354                 }
355     }
356     
357         private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created";
358         private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created";
359         private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session";
360         private static final Object LOCK = new Object();
361
362         private static class ResetRequest {
363                 //package on purpose
364                 Session session;
365                 long timestamp;
366                 
367                 public ResetRequest(Session session) {
368                         this.session = session;
369                         timestamp = System.currentTimeMillis();
370                 }
371         }
372
373         
374         public static final void primePSIs(TransStore trans) throws APIException, IOException {
375                 for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
376                         if(psi.ps==null) {
377                                 psi.ps(trans);
378                         }
379                 }
380         }
381         
382         public final Session getSession(TransStore trans) throws APIException, IOException {
383                 // SessionFilter unused since 2015
384                 // Try to use Trans' session, if exists
385 //              if(sessionSlot!=null) { // try to get from Trans
386 //                      Session sess = trans.get(sessionSlot, null);
387 //                      if(sess!=null) {
388 //                              return sess;
389 //                      }
390 //              }
391                 
392                 // If there's an owning DAO, use it's session
393                 if(owningDAO!=null) { 
394                         return owningDAO.getSession(trans);
395                 }
396                 
397                 // OK, nothing else works... get our own.
398                 if(session==null || resetTrigger) {
399                         Cluster tempCluster = null;
400                         Session tempSession = null;
401                         try {
402                                 synchronized(LOCK) {
403                                         boolean reset = false;
404                                         for(ResetRequest r : resetDeque) {
405                                                 if(r.session == session) {
406                                                         if(r.timestamp>nextAvailableReset) {
407                                                                 reset=true;
408                                                                 nextAvailableReset = System.currentTimeMillis() + 60000;
409                                                                 tempCluster = cluster;
410                                                                 tempSession = session;
411                                                                 break;
412                                                         } else {
413                                                                 trans.warn().log("Cassandra Connection Reset Ignored: Recent Reset");
414                                                         }
415                                                 }
416                                         }
417         
418                                         if(reset || session == null) {
419                                                 TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB);
420                                                 try {
421                                                         // Note: Maitrayee recommended not closing the cluster, just
422                                                         // overwrite it. Jonathan 9/30/2016 assuming same for Session
423                                                         // This was a bad idea.  Ran out of File Handles as I suspected, Jonathan
424                                                         if(reset) {
425                                                                 for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
426                                                                         psi.reset();
427                                                                 }
428                                                         }
429                                                         if(reset || cluster==null) {
430                                                                 cluster = CassAccess.cluster(trans, keyspace);
431                                                                 trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED);
432                                                         }
433                                                         if(reset || session==null) {
434                                                                 session = cluster.connect(keyspace);
435                                                                 trans.warn().log(NEW_CASSANDRA_SESSION_CREATED);
436                                                         }
437                                                 } finally {
438                                                         resetTrigger=false;
439                                                         tt.done();
440                                                 }
441                                         }
442                                 }
443                         } finally {
444                                 TimeTaken tt = trans.start("Clear Reset Deque", Env.SUB);
445                                 try {
446                                         resetDeque.clear();
447                                         // Not clearing Session/Cluster appears to kill off FileHandles
448                                         if(tempSession!=null && !tempSession.isClosed()) {
449                                                 tempSession.close();
450                                         }
451                                         if(tempCluster!=null && !tempCluster.isClosed()) {
452                                                 tempCluster.close();
453                                         }
454                                 } finally {
455                                         tt.done();
456                                 }
457                         }
458                 }
459                 return session;
460         }
461         
462         public final boolean reportPerhapsReset(TransStore trans, Exception e) {
463                 if(owningDAO!=null) {
464                         return owningDAO.reportPerhapsReset(trans, e);
465                 } else {
466                         boolean rv = false;
467                         if(CassAccess.isResetException(e)) {
468                                 trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName());
469                                 resetDeque.addFirst(new ResetRequest(session));
470                                 rv = resetTrigger = true;
471                         } 
472                         trans.error().log(e);
473                         return rv;
474                 }
475         }
476
477         public void close(TransStore trans) {
478                 if(owningDAO==null) {
479                         if(session!=null) {
480                                 TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB);
481                                 try {
482                                         session.close();
483                                 } finally {
484                                         tt.done();
485                                 }
486                                 session = null;
487                         } else {
488                                 trans.debug().log("close called(), Session already closed");
489                         }
490                 } else {
491                         owningDAO.close(trans);
492                 }
493         }
494
495         protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) {
496         }
497         
498         protected interface Accept<DATA> {
499                 public boolean ok(DATA data);
500         }
501
502 }
503
504
505