[AAF-21] Updated Copyright Headers for AAF
[aaf/authz.git] / authz-cass / src / main / java / com / att / dao / AbsCassDAO.java
1 /*******************************************************************************\r
2  * ============LICENSE_START====================================================\r
3  * * org.onap.aaf\r
4  * * ===========================================================================\r
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * * ===========================================================================\r
7  * * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * * you may not use this file except in compliance with the License.\r
9  * * You may obtain a copy of the License at\r
10  * * \r
11  *  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  * * \r
13  *  * Unless required by applicable law or agreed to in writing, software\r
14  * * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * * See the License for the specific language governing permissions and\r
17  * * limitations under the License.\r
18  * * ============LICENSE_END====================================================\r
19  * *\r
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
21  * *\r
22  ******************************************************************************/\r
23 package com.att.dao;\r
24 \r
25 import java.io.IOException;\r
26 import java.util.ArrayList;\r
27 import java.util.Deque;\r
28 import java.util.List;\r
29 import java.util.concurrent.ConcurrentLinkedDeque;\r
30 \r
31 import com.att.authz.layer.Result;\r
32 import com.att.dao.aaf.cass.Status;\r
33 import com.att.inno.env.APIException;\r
34 import com.att.inno.env.Env;\r
35 import com.att.inno.env.Slot;\r
36 import com.att.inno.env.TimeTaken;\r
37 import com.att.inno.env.TransStore;\r
38 import com.datastax.driver.core.BoundStatement;\r
39 import com.datastax.driver.core.Cluster;\r
40 import com.datastax.driver.core.ConsistencyLevel;\r
41 import com.datastax.driver.core.ResultSet;\r
42 import com.datastax.driver.core.ResultSetFuture;\r
43 import com.datastax.driver.core.Row;\r
44 import com.datastax.driver.core.Session;\r
45 import com.datastax.driver.core.exceptions.DriverException;\r
46 \r
47 public abstract class AbsCassDAO<TRANS extends TransStore,DATA> {\r
48         protected static final char DOT = '.';\r
49         protected static final char DOT_PLUS_ONE = '.'+1;\r
50         protected static final String FIRST_CHAR = Character.toString((char)0);\r
51         protected static final String LAST_CHAR = Character.toString((char)Character.MAX_VALUE);\r
52         protected static final int FIELD_COMMAS = 0;\r
53         protected static final int QUESTION_COMMAS = 1;\r
54         protected static final int ASSIGNMENT_COMMAS = 2;\r
55         protected static final int WHERE_ANDS = 3;\r
56         \r
57         private Cluster cluster; \r
58         private Session session;\r
59         private final String keyspace;\r
60         // If this is null, then we own session\r
61         private final AbsCassDAO<TRANS,?> owningDAO;\r
62         protected Class<DATA> dataClass;\r
63         private final String name;\r
64         private static Slot sessionSlot;\r
65         //private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<AbsCassDAO<TransStore,?>.PSInfo>();\r
66         private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo>();\r
67         private static final List<Object> EMPTY = new ArrayList<Object>(0);\r
68         private static final Deque<ResetRequest> resetDeque = new ConcurrentLinkedDeque<ResetRequest>();\r
69         private static boolean resetTrigger = false;\r
70         private static long nextAvailableReset = 0;\r
71         \r
72 \r
73         public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass) {\r
74                 this.name = name;\r
75                 this.cluster = cluster;\r
76                 this.keyspace = keyspace;\r
77                 owningDAO = null;  // we own session\r
78                 session = null;\r
79                 this.dataClass = dataClass;\r
80                 \r
81         }\r
82 \r
83         public AbsCassDAO(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass) {      \r
84                 this.name = name;\r
85                 cluster = aDao.cluster;\r
86                 keyspace = aDao.keyspace;\r
87                 session = null;\r
88                 owningDAO = aDao; // We do not own session\r
89                 this.dataClass = dataClass;\r
90         }\r
91         \r
92         public static void setSessionSlot(Slot slot) {\r
93                 sessionSlot = slot;\r
94         }\r
95 \r
96         //Note: Lower case ON PURPOSE. These names used to create History Messages\r
97         public enum CRUD {\r
98                 create,read,update,delete\r
99         ;\r
100 \r
101 }\r
102 \r
103         public class PSInfo {\r
104                 private BoundStatement ps;\r
105                 private final int size;\r
106                 private final Loader<DATA> loader;\r
107                 private final CRUD crud; // Store CRUD, because it makes a difference in Object Order, see Loader\r
108                 private final String cql;\r
109                 private final ConsistencyLevel consistency;\r
110 \r
111 \r
112                 /**\r
113                  * Create a PSInfo and create Prepared Statement\r
114                  * \r
115                  * @param trans\r
116                  * @param theCQL\r
117                  * @param loader\r
118                  */\r
119                 public PSInfo(TRANS trans, String theCQL, Loader<DATA> loader, ConsistencyLevel consistency) {\r
120                         this.loader = loader;\r
121                         this.consistency=consistency;\r
122                         psinfos.add(this);\r
123 \r
124                         cql = theCQL.trim().toUpperCase();\r
125                         if(cql.startsWith("INSERT")) {\r
126                                 crud = CRUD.create;\r
127                         } else if(cql.startsWith("UPDATE")) {\r
128                                 crud = CRUD.update;\r
129                         } else if(cql.startsWith("DELETE")) {\r
130                                 crud = CRUD.delete;\r
131                         } else {\r
132                                 crud = CRUD.read;\r
133                         }\r
134                         \r
135                         int idx = 0, count=0;\r
136                         while((idx=cql.indexOf('?',idx))>=0) {\r
137                                 ++idx;\r
138                                 ++count;\r
139                         }\r
140                         size=count;\r
141                 }\r
142                 \r
143                 public synchronized void reset() {\r
144                         ps = null;\r
145                 }\r
146                 \r
147                 private BoundStatement ps(TransStore trans) throws APIException, IOException {\r
148                         if(ps==null) {\r
149                                 synchronized(this) {\r
150                                         if(ps==null) {\r
151                                                 TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB);\r
152                                                 try {\r
153                                                         ps = new BoundStatement(getSession(trans).prepare(cql));\r
154                                                         ps.setConsistencyLevel(consistency);\r
155                                                 } catch (DriverException e) {\r
156                                                         reportPerhapsReset(trans,e);\r
157                                                         throw e;\r
158                                                 } finally {\r
159                                                         tt.done();\r
160                                                 }\r
161                                         }\r
162                                 }\r
163                         }\r
164                         return ps;\r
165                 }\r
166 \r
167                 /**\r
168                  * Execute a Prepared Statement by extracting from DATA object\r
169                  * \r
170                  * @param trans\r
171                  * @param text\r
172                  * @param data\r
173                  * @return\r
174                  */\r
175                 public Result<ResultSetFuture> execAsync(TRANS trans, String text, DATA data) {\r
176                         TimeTaken tt = trans.start(text, Env.REMOTE);\r
177                         try {\r
178                                 return Result.ok(getSession(trans).executeAsync(\r
179                                                 ps(trans).bind(loader.extract(data, size, crud))));\r
180                         } catch (DriverException | APIException | IOException e) {\r
181                                 AbsCassDAO.this.reportPerhapsReset(trans,e);\r
182                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
183                         } finally {\r
184                                 tt.done();\r
185                         }\r
186                 }\r
187 \r
188                 /**\r
189                  * Execute a Prepared Statement on Object[] key\r
190                  * \r
191                  * @param trans\r
192                  * @param text\r
193                  * @param objs\r
194                  * @return\r
195                  */\r
196                 public Result<ResultSetFuture> execAsync(TRANS trans, String text, Object ... objs) {\r
197                         TimeTaken tt = trans.start(text, Env.REMOTE);\r
198                         try {\r
199                                 return Result.ok(getSession(trans).executeAsync(ps(trans).bind(objs)));\r
200                         } catch (DriverException | APIException | IOException e) {\r
201                                 AbsCassDAO.this.reportPerhapsReset(trans,e);\r
202                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
203                         } finally {\r
204                                 tt.done();\r
205                         }\r
206                 }\r
207                 \r
208                 /* \r
209                  * Note:\r
210                  * \r
211                  */\r
212 \r
213                 /**\r
214                  * Execute a Prepared Statement by extracting from DATA object\r
215                  * \r
216                  * @param trans\r
217                  * @param text\r
218                  * @param data\r
219                  * @return\r
220                  */\r
221                 public Result<ResultSet> exec(TRANS trans, String text, DATA data) {\r
222                         TimeTaken tt = trans.start(text, Env.REMOTE);\r
223                         try {\r
224                                 /*\r
225                                  * "execute" (and executeAsync)\r
226                                  * Executes the provided query.\r
227                                         This method blocks until at least some result has been received from the database. However, \r
228                                         for SELECT queries, it does not guarantee that the result has been received in full. But it \r
229                                         does guarantee that some response has been received from the database, and in particular \r
230                                         guarantee that if the request is invalid, an exception will be thrown by this method.\r
231 \r
232                                         Parameters:\r
233                                         statement - the CQL query to execute (that can be any Statement).\r
234                                         Returns:\r
235                                                 the result of the query. That result will never be null but can be empty (and will \r
236                                                 be for any non SELECT query).\r
237                                  */\r
238                                 return Result.ok(getSession(trans).execute(\r
239                                                 ps(trans).bind(loader.extract(data, size, crud))));\r
240                         } catch (DriverException | APIException | IOException e) {\r
241                                 AbsCassDAO.this.reportPerhapsReset(trans,e);\r
242                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
243                         } finally {\r
244                                 tt.done();\r
245                         }\r
246                 }\r
247 \r
248                 /**\r
249                  * Execute a Prepared Statement on Object[] key\r
250                  * \r
251                  * @param trans\r
252                  * @param text\r
253                  * @param objs\r
254                  * @return\r
255                  */\r
256                 public Result<ResultSet> exec(TRANS trans, String text, Object ... objs) {\r
257                         TimeTaken tt = trans.start(text, Env.REMOTE);\r
258                         try {\r
259                                 return Result.ok(getSession(trans).execute(ps(trans).bind(objs)));\r
260                         } catch (DriverException | APIException | IOException e) {\r
261                                 AbsCassDAO.this.reportPerhapsReset(trans,e);\r
262                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
263                         } finally {\r
264                                 tt.done();\r
265                         }\r
266                 }\r
267 \r
268                 /**\r
269                  * Read the Data from Cassandra given a Prepared Statement (defined by the\r
270                  * DAO Instance)\r
271                  *\r
272                  * This is common behavior among all DAOs.\r
273                  * @throws DAOException\r
274                  */\r
275                 public Result<List<DATA>> read(TRANS trans, String text, Object[] key) {\r
276                         TimeTaken tt = trans.start(text,Env.REMOTE);\r
277                         \r
278                         ResultSet rs;\r
279                         try {\r
280                                 rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key));\r
281 /// TEST CODE for Exception                             \r
282 //                              boolean force = true; \r
283 //                              if(force) {\r
284 //                                      Map<InetSocketAddress, Throwable> misa = new HashMap<InetSocketAddress,Throwable>();\r
285 //                                      //misa.put(new InetSocketAddress(444),new Exception("no host was tried"));\r
286 //                                      misa.put(new InetSocketAddress(444),new Exception("Connection has been closed"));\r
287 //                                      throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa);\r
288 ////                                    throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried");\r
289 //                              }\r
290 //// END TEST CODE\r
291                         } catch (DriverException | APIException | IOException e) {\r
292                                 AbsCassDAO.this.reportPerhapsReset(trans,e);\r
293                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
294                         } finally {\r
295                                 tt.done();\r
296                         }\r
297                         \r
298                         return extract(loader,rs,null /*let Array be created if necessary*/,dflt);\r
299                 }\r
300                 \r
301                 public Result<List<DATA>> read(TRANS trans, String text, DATA data) {\r
302                         return read(trans,text, loader.extract(data, size, crud));\r
303                 }\r
304                 \r
305                 public Object[] keyFrom(DATA data) {\r
306                         return loader.extract(data, size, CRUD.delete); // Delete is key only\r
307                 }\r
308 \r
309                 /*\r
310                  * Note: in case PSInfos are deleted, we want to remove them from list.  This is not expected, \r
311                  * but we don't want a data leak if it does.  Finalize doesn't have to happen quickly\r
312                  */\r
313                 @Override\r
314                 protected void finalize() throws Throwable {\r
315                         psinfos.remove(this);\r
316                 }\r
317         }\r
318 \r
319         protected final Accept<DATA> dflt = new Accept<DATA>() {\r
320                 @Override\r
321                 public boolean ok(DATA data) {\r
322                         return true;\r
323                 }\r
324         };\r
325 \r
326 \r
327         @SuppressWarnings("unchecked")\r
328     protected final Result<List<DATA>> extract(Loader<DATA> loader, ResultSet rs, List<DATA> indata, Accept<DATA> accept) {\r
329                 List<Row> rows = rs.all();\r
330                 if(rows.isEmpty()) {\r
331                         return Result.ok((List<DATA>)EMPTY); // Result sets now .emptyList(true);\r
332                 } else {\r
333                         DATA d;\r
334                         List<DATA> data = indata==null?new ArrayList<DATA>(rows.size()):indata;\r
335                         \r
336                         for(Row row : rows) {\r
337                                 try {\r
338                                         d = loader.load(dataClass.newInstance(),row);\r
339                                         if(accept.ok(d)) {\r
340                                                 data.add(d);\r
341                                         }\r
342                                 } catch(Exception e) {\r
343                                         return Result.err(e);\r
344                                 }\r
345                         }\r
346                         return Result.ok(data);\r
347                 }\r
348     }\r
349     \r
350         private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created";\r
351         private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created";\r
352         private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session";\r
353 \r
354         private static class ResetRequest {\r
355                 //package on purpose\r
356                 Session session;\r
357                 long timestamp;\r
358                 \r
359                 public ResetRequest(Session session) {\r
360                         this.session = session;\r
361                         timestamp = System.currentTimeMillis();\r
362                 }\r
363         }\r
364 \r
365         \r
366         public static final void primePSIs(TransStore trans) throws APIException, IOException {\r
367                 for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {\r
368                         if(psi.ps==null) {\r
369                                 psi.ps(trans);\r
370                         }\r
371                 }\r
372         }\r
373         \r
374         public final Session getSession(TransStore trans) throws APIException, IOException {\r
375                 // Try to use Trans' session, if exists\r
376                 if(sessionSlot!=null) { // try to get from Trans\r
377                         Session sess = trans.get(sessionSlot, null);\r
378                         if(sess!=null) {\r
379                                 return sess;\r
380                         }\r
381                 }\r
382                 \r
383                 // If there's an owning DAO, use it's session\r
384                 if(owningDAO!=null) {\r
385                         return owningDAO.getSession(trans);\r
386                 }\r
387                 \r
388                 // OK, nothing else works... get our own.\r
389                 if(session==null || resetTrigger) {\r
390                         Cluster tempCluster = null;\r
391                         Session tempSession = null;\r
392                         try {\r
393                                 synchronized(NEW_CASSANDRA_SESSION_CREATED) {\r
394                                         boolean reset = false;\r
395                                         for(ResetRequest r : resetDeque) {\r
396                                                 if(r.session == session) {\r
397                                                         if(r.timestamp>nextAvailableReset) {\r
398                                                                 reset=true;\r
399                                                                 nextAvailableReset = System.currentTimeMillis() + 60000;\r
400                                                                 tempCluster = cluster;\r
401                                                                 tempSession = session;\r
402                                                                 break;\r
403                                                         } else {\r
404                                                                 trans.warn().log("Cassandra Connection Reset Ignored: Recent Reset");\r
405                                                         }\r
406                                                 }\r
407                                         }\r
408         \r
409                                         if(reset || session == null) {\r
410                                                 TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB);\r
411                                                 try {\r
412                                                         // Note: Maitrayee recommended not closing the cluster, just\r
413                                                         // overwrite it. 9/30/2016 assuming same for Session\r
414                                                         // This was a bad idea.  Ran out of File Handles as I suspected..\r
415                                                         if(reset) {\r
416                                                                 for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {\r
417                                                                         psi.reset();\r
418                                                                 }\r
419                                                         }\r
420                                                         if(reset || cluster==null) {\r
421                                                                 cluster = CassAccess.cluster(trans, keyspace);\r
422                                                                 trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED);\r
423                                                         }\r
424                                                         if(reset || session==null) {\r
425                                                                 session = cluster.connect(keyspace);\r
426                                                                 trans.warn().log(NEW_CASSANDRA_SESSION_CREATED);\r
427                                                         }\r
428                                                 } finally {\r
429                                                         resetTrigger=false;\r
430                                                         tt.done();\r
431                                                 }\r
432                                         }\r
433                                 }\r
434                         } finally {\r
435                                 TimeTaken tt = trans.start("Clear Reset Deque", Env.SUB);\r
436                                 try {\r
437                                         resetDeque.clear();\r
438                                         // Not clearing Session/Cluster appears to kill off FileHandles\r
439                                         if(tempSession!=null && !tempSession.isClosed()) {\r
440                                                 tempSession.close();\r
441                                         }\r
442                                         if(tempCluster!=null && !tempCluster.isClosed()) {\r
443                                                 tempCluster.close();\r
444                                         }\r
445                                 } finally {\r
446                                         tt.done();\r
447                                 }\r
448                         }\r
449                 }\r
450                 return session;\r
451         }\r
452         \r
453         public final boolean reportPerhapsReset(TransStore trans, Exception e) {\r
454                 if(owningDAO!=null) {\r
455                         return owningDAO.reportPerhapsReset(trans, e);\r
456                 } else {\r
457                         boolean rv = false;\r
458                         if(CassAccess.isResetException(e)) {\r
459                                 trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName());\r
460                                 resetDeque.addFirst(new ResetRequest(session));\r
461                                 rv = resetTrigger = true;\r
462                         } \r
463                         trans.error().log(e);\r
464                         return rv;\r
465                 }\r
466         }\r
467 \r
468         public void close(TransStore trans) {\r
469                 if(owningDAO==null) {\r
470                         if(session!=null) {\r
471                                 TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB);\r
472                                 try {\r
473                                         session.close();\r
474                                 } finally {\r
475                                         tt.done();\r
476                                 }\r
477                                 session = null;\r
478                         } else {\r
479                                 trans.debug().log("close called(), Session already closed");\r
480                         }\r
481                 } else {\r
482                         owningDAO.close(trans);\r
483                 }\r
484         }\r
485 \r
486         protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) {\r
487         }\r
488         \r
489         protected interface Accept<DATA> {\r
490                 public boolean ok(DATA data);\r
491         }\r
492 \r
493 }\r
494 \r
495 \r
496 \r