AT&T 2.0.19 Code drop, stage 3
[aaf/authz.git] / auth / auth-cass / src / main / java / org / onap / aaf / auth / dao / AbsCassDAO.java
1 /**
2  * ============LICENSE_START====================================================
3  * org.onap.aaf
4  * ===========================================================================
5  * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
6  * ===========================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END====================================================
19  *
20  */
21
22 package org.onap.aaf.auth.dao;
23
24 import java.io.IOException;
25 import java.util.ArrayList;
26 import java.util.Deque;
27 import java.util.List;
28 import java.util.concurrent.ConcurrentLinkedDeque;
29
30 import org.onap.aaf.auth.dao.cass.Status;
31 import org.onap.aaf.auth.layer.Result;
32 import org.onap.aaf.misc.env.APIException;
33 import org.onap.aaf.misc.env.Env;
34 import org.onap.aaf.misc.env.TimeTaken;
35 import org.onap.aaf.misc.env.TransStore;
36
37 import com.datastax.driver.core.BoundStatement;
38 import com.datastax.driver.core.Cluster;
39 import com.datastax.driver.core.ConsistencyLevel;
40 import com.datastax.driver.core.PreparedStatement;
41 import com.datastax.driver.core.ResultSet;
42 import com.datastax.driver.core.ResultSetFuture;
43 import com.datastax.driver.core.Row;
44 import com.datastax.driver.core.Session;
45 import com.datastax.driver.core.exceptions.DriverException;
46
47 public abstract class AbsCassDAO<TRANS extends TransStore,DATA> {
48         protected static final char DOT = '.';
49         protected static final char DOT_PLUS_ONE = '.'+1;
50         protected static final String FIRST_CHAR = Character.toString((char)0);
51         protected static final String LAST_CHAR = Character.toString((char)Character.MAX_VALUE);
52         protected static final int FIELD_COMMAS = 0;
53         protected static final int QUESTION_COMMAS = 1;
54         protected static final int ASSIGNMENT_COMMAS = 2;
55         protected static final int WHERE_ANDS = 3;
56
57         private Cluster cluster; 
58         /*
59          * From DataStax
60          * com.datastax.driver.core.Session
61                 A session holds connections to a Cassandra cluster, allowing it to be queried. Each session maintains multiple connections to the cluster nodes, 
62                 provides policies to choose which node to use for each query (round-robin on all nodes of the cluster by default), and handles retries for 
63                 failed query (when it makes sense), etc...
64                 Session instances are thread-safe and usually a single instance is enough per application. However, a given session can only be set to one 
65                 keyspace at a time, so one instance per keyspace is necessary.
66          */
67         private Session session;
68         private final String keyspace;
69         // If this is null, then we own session
70         private final AbsCassDAO<TRANS,?> owningDAO;
71         protected Class<DATA> dataClass;
72         private final String name;
73 //      private static Slot sessionSlot; // not used since 2015
74         private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo>();
75         private static final List<Object> EMPTY = new ArrayList<Object>(0);
76         private static final Deque<ResetRequest> resetDeque = new ConcurrentLinkedDeque<ResetRequest>();
77         private static boolean resetTrigger = false;
78         private static long nextAvailableReset = 0;
79         
80         public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass) {
81                 this.name = name;
82                 this.cluster = cluster;
83                 this.keyspace = keyspace;
84                 owningDAO = null;  // we own session
85                 session = null;
86                 this.dataClass = dataClass;
87         }
88
89         public AbsCassDAO(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass) {
90                 this.name = name;
91                 cluster = aDao.cluster;
92                 keyspace = aDao.keyspace;
93                 session = null;
94                 // We do not own session
95                 owningDAO = aDao;
96                 this.dataClass = dataClass;
97         }
98         
99 // Not used since 2015
100 //      public static void setSessionSlot(Slot slot) {
101 //              sessionSlot = slot;
102 //      }
103
104         //Note: Lower case ON PURPOSE. These names used to create History Messages
105         public enum CRUD {
106                 create,read,update,delete;
107         }
108
109         public class PSInfo {
110                 private PreparedStatement ps;
111                 private final int size;
112                 private final Loader<DATA> loader;
113                 private final CRUD crud; // Store CRUD, because it makes a difference in Object Order, see Loader
114                 private final String cql;
115                 private final ConsistencyLevel consistency;
116
117
118                 /**
119                  * Create a PSInfo and create Prepared Statement
120                  * 
121                  * @param trans
122                  * @param theCQL
123                  * @param loader
124                  */
125                 public PSInfo(TRANS trans, String theCQL, Loader<DATA> loader, ConsistencyLevel consistency) {
126                         this.loader = loader;
127                         this.consistency=consistency;
128                         psinfos.add(this);
129
130                         cql = theCQL.trim().toUpperCase();
131                         if(cql.startsWith("INSERT")) {
132                                 crud = CRUD.create;
133                         } else if(cql.startsWith("UPDATE")) {
134                                 crud = CRUD.update;
135                         } else if(cql.startsWith("DELETE")) {
136                                 crud = CRUD.delete;
137                         } else {
138                                 crud = CRUD.read;
139                         }
140                         
141                         int idx = 0, count=0;
142                         while((idx=cql.indexOf('?',idx))>=0) {
143                                 ++idx;
144                                 ++count;
145                         }
146                         size=count;
147                 }
148                 
149                 public synchronized void reset() {
150                         ps = null;
151                 }
152                 
153                 private synchronized BoundStatement ps(TransStore trans) throws APIException, IOException {
154                         /* From Datastax
155                                 You should prepare only once, and cache the PreparedStatement in your application (it is thread-safe). 
156                                 If you call prepare multiple times with the same query string, the driver will log a warning.
157                         */
158                         if(ps==null) {
159                                 TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB);
160                                 try {
161                                         ps = getSession(trans).prepare(cql);
162                                         ps.setConsistencyLevel(consistency);
163                                 } catch (DriverException e) {
164                                         reportPerhapsReset(trans,e);
165                                         throw e;
166                                 } finally {
167                                         tt.done();
168                                 }
169                         }
170                         // BoundStatements are NOT threadsafe... need a new one each time.
171                         return new BoundStatement(ps);
172                 }
173
174                 /**
175                  * Execute a Prepared Statement by extracting from DATA object
176                  * 
177                  * @param trans
178                  * @param text
179                  * @param data
180                  * @return
181                  */
182                 public Result<ResultSetFuture> execAsync(TRANS trans, String text, DATA data) {
183                         TimeTaken tt = trans.start(text, Env.REMOTE);
184                         try {
185                                 return Result.ok(getSession(trans).executeAsync(
186                                                 ps(trans).bind(loader.extract(data, size, crud))));
187                         } catch (DriverException | APIException | IOException e) {
188                                 AbsCassDAO.this.reportPerhapsReset(trans,e);
189                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
190                         } finally {
191                                 tt.done();
192                         }
193                 }
194
195                 /**
196                  * Execute a Prepared Statement on Object[] key
197                  * 
198                  * @param trans
199                  * @param text
200                  * @param objs
201                  * @return
202                  */
203                 public Result<ResultSetFuture> execAsync(TRANS trans, String text, Object ... objs) {
204                         TimeTaken tt = trans.start(text, Env.REMOTE);
205                         try {
206                                 return Result.ok(getSession(trans).executeAsync(ps(trans).bind(objs)));
207                         } catch (DriverException | APIException | IOException e) {
208                                 AbsCassDAO.this.reportPerhapsReset(trans,e);
209                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
210                         } finally {
211                                 tt.done();
212                         }
213                 }
214                 
215                 /* 
216                  * Note:
217                  * 
218                  */
219
220                 /**
221                  * Execute a Prepared Statement by extracting from DATA object
222                  * 
223                  * @param trans
224                  * @param text
225                  * @param data
226                  * @return
227                  */
228                 public Result<ResultSet> exec(TRANS trans, String text, DATA data) {
229                         TimeTaken tt = trans.start(text, Env.REMOTE);
230                         try {
231                                 /*
232                                  * "execute" (and executeAsync)
233                                  * Executes the provided query.
234                                         This method blocks until at least some result has been received from the database. However, 
235                                         for SELECT queries, it does not guarantee that the result has been received in full. But it 
236                                         does guarantee that some response has been received from the database, and in particular 
237                                         guarantee that if the request is invalid, an exception will be thrown by this method.
238
239                                         Parameters:
240                                         statement - the CQL query to execute (that can be any Statement).
241                                         Returns:
242                                                 the result of the query. That result will never be null but can be empty (and will 
243                                                 be for any non SELECT query).
244                                  */
245                                 return Result.ok(getSession(trans).execute(
246                                                 ps(trans).bind(loader.extract(data, size, crud))));
247                         } catch (DriverException | APIException | IOException e) {
248                                 AbsCassDAO.this.reportPerhapsReset(trans,e);
249                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
250                         } finally {
251                                 tt.done();
252                         }
253                 }
254
255                 /**
256                  * Execute a Prepared Statement on Object[] key
257                  * 
258                  * @param trans
259                  * @param text
260                  * @param objs
261                  * @return
262                  */
263                 public Result<ResultSet> exec(TRANS trans, String text, Object ... objs) {
264                         TimeTaken tt = trans.start(text, Env.REMOTE);
265                         try {
266                                 return Result.ok(getSession(trans).execute(ps(trans).bind(objs)));
267                         } catch (DriverException | APIException | IOException e) {
268                                 AbsCassDAO.this.reportPerhapsReset(trans,e);
269                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
270                         } finally {
271                                 tt.done();
272                         }
273                 }
274
275                 /**
276                  * Read the Data from Cassandra given a Prepared Statement (defined by the
277                  * DAO Instance)
278                  *
279                  * This is common behavior among all DAOs.
280                  * @throws DAOException
281                  */
282                 public Result<List<DATA>> read(TRANS trans, String text, Object[] key) {
283                         TimeTaken tt = trans.start(text,Env.REMOTE);
284                         
285                         ResultSet rs;
286                         try {
287                                 rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key));
288 /// TEST CODE for Exception                             
289 //                              boolean force = true; 
290 //                              if(force) {
291 //                                      Map<InetSocketAddress, Throwable> misa = new HashMap<InetSocketAddress,Throwable>();
292 //                                      //misa.put(new InetSocketAddress(444),new Exception("no host was tried"));
293 //                                      misa.put(new InetSocketAddress(444),new Exception("Connection has been closed"));
294 //                                      throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa);
295 ////                                    throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried");
296 //                              }
297 //// END TEST CODE
298                         } catch (DriverException | APIException | IOException e) {
299                                 AbsCassDAO.this.reportPerhapsReset(trans,e);
300                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);
301                         } finally {
302                                 tt.done();
303                         }
304                         
305                         return extract(loader,rs,null /*let Array be created if necessary*/,dflt);
306                 }
307                 
308                 public Result<List<DATA>> read(TRANS trans, String text, DATA data) {
309                         return read(trans,text, loader.extract(data, size, crud));
310                 }
311                 
312                 public Object[] keyFrom(DATA data) {
313                         return loader.extract(data, size, CRUD.delete); // Delete is key only
314                 }
315
316                 /*
317                  * Note: in case PSInfos are deleted, we want to remove them from list.  This is not expected, 
318                  * but we don't want a data leak if it does.  Finalize doesn't have to happen quickly
319                  */
320                 @Override
321                 protected void finalize() throws Throwable {
322                         psinfos.remove(this);
323                 }
324         }
325
326         protected final Accept<DATA> dflt = new Accept<DATA>() {
327                 @Override
328                 public boolean ok(DATA data) {
329                         return true;
330                 }
331         };
332
333
334         @SuppressWarnings("unchecked")
335     protected final Result<List<DATA>> extract(Loader<DATA> loader, ResultSet rs, List<DATA> indata, Accept<DATA> accept) {
336                 List<Row> rows = rs.all();
337                 if(rows.isEmpty()) {
338                         return Result.ok((List<DATA>)EMPTY); // Result sets now .emptyList(true);
339                 } else {
340                         DATA d;
341                         List<DATA> data = indata==null?new ArrayList<DATA>(rows.size()):indata;
342                         
343                         for(Row row : rows) {
344                                 try {
345                                         d = loader.load(dataClass.newInstance(),row);
346                                         if(accept.ok(d)) {
347                                                 data.add(d);
348                                         }
349                                 } catch(Exception e) {
350                                         return Result.err(e);
351                                 }
352                         }
353                         return Result.ok(data);
354                 }
355     }
356     
357         private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created";
358         private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created";
359         private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session";
360
361         private static class ResetRequest {
362                 //package on purpose
363                 Session session;
364                 long timestamp;
365                 
366                 public ResetRequest(Session session) {
367                         this.session = session;
368                         timestamp = System.currentTimeMillis();
369                 }
370         }
371
372         
373         public static final void primePSIs(TransStore trans) throws APIException, IOException {
374                 for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
375                         if(psi.ps==null) {
376                                 psi.ps(trans);
377                         }
378                 }
379         }
380         
381         public final Session getSession(TransStore trans) throws APIException, IOException {
382                 // SessionFilter unused since 2015
383                 // Try to use Trans' session, if exists
384 //              if(sessionSlot!=null) { // try to get from Trans
385 //                      Session sess = trans.get(sessionSlot, null);
386 //                      if(sess!=null) {
387 //                              return sess;
388 //                      }
389 //              }
390                 
391                 // If there's an owning DAO, use it's session
392                 if(owningDAO!=null) { 
393                         return owningDAO.getSession(trans);
394                 }
395                 
396                 // OK, nothing else works... get our own.
397                 if(session==null || resetTrigger) {
398                         Cluster tempCluster = null;
399                         Session tempSession = null;
400                         try {
401                                 synchronized(NEW_CASSANDRA_SESSION_CREATED) {
402                                         boolean reset = false;
403                                         for(ResetRequest r : resetDeque) {
404                                                 if(r.session == session) {
405                                                         if(r.timestamp>nextAvailableReset) {
406                                                                 reset=true;
407                                                                 nextAvailableReset = System.currentTimeMillis() + 60000;
408                                                                 tempCluster = cluster;
409                                                                 tempSession = session;
410                                                                 break;
411                                                         } else {
412                                                                 trans.warn().log("Cassandra Connection Reset Ignored: Recent Reset");
413                                                         }
414                                                 }
415                                         }
416         
417                                         if(reset || session == null) {
418                                                 TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB);
419                                                 try {
420                                                         // Note: Maitrayee recommended not closing the cluster, just
421                                                         // overwrite it. Jonathan 9/30/2016 assuming same for Session
422                                                         // This was a bad idea.  Ran out of File Handles as I suspected, Jonathan
423                                                         if(reset) {
424                                                                 for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {
425                                                                         psi.reset();
426                                                                 }
427                                                         }
428                                                         if(reset || cluster==null) {
429                                                                 cluster = CassAccess.cluster(trans, keyspace);
430                                                                 trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED);
431                                                         }
432                                                         if(reset || session==null) {
433                                                                 session = cluster.connect(keyspace);
434                                                                 trans.warn().log(NEW_CASSANDRA_SESSION_CREATED);
435                                                         }
436                                                 } finally {
437                                                         resetTrigger=false;
438                                                         tt.done();
439                                                 }
440                                         }
441                                 }
442                         } finally {
443                                 TimeTaken tt = trans.start("Clear Reset Deque", Env.SUB);
444                                 try {
445                                         resetDeque.clear();
446                                         // Not clearing Session/Cluster appears to kill off FileHandles
447                                         if(tempSession!=null && !tempSession.isClosed()) {
448                                                 tempSession.close();
449                                         }
450                                         if(tempCluster!=null && !tempCluster.isClosed()) {
451                                                 tempCluster.close();
452                                         }
453                                 } finally {
454                                         tt.done();
455                                 }
456                         }
457                 }
458                 return session;
459         }
460         
461         public final boolean reportPerhapsReset(TransStore trans, Exception e) {
462                 if(owningDAO!=null) {
463                         return owningDAO.reportPerhapsReset(trans, e);
464                 } else {
465                         boolean rv = false;
466                         if(CassAccess.isResetException(e)) {
467                                 trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName());
468                                 resetDeque.addFirst(new ResetRequest(session));
469                                 rv = resetTrigger = true;
470                         } 
471                         trans.error().log(e);
472                         return rv;
473                 }
474         }
475
476         public void close(TransStore trans) {
477                 if(owningDAO==null) {
478                         if(session!=null) {
479                                 TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB);
480                                 try {
481                                         session.close();
482                                 } finally {
483                                         tt.done();
484                                 }
485                                 session = null;
486                         } else {
487                                 trans.debug().log("close called(), Session already closed");
488                         }
489                 } else {
490                         owningDAO.close(trans);
491                 }
492         }
493
494         protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) {
495         }
496         
497         protected interface Accept<DATA> {
498                 public boolean ok(DATA data);
499         }
500
501 }
502
503
504