c76a88f94afd6c3cd6db0a163e9a6ea224eca4e6
[aaf/authz.git] / authz-cass / src / main / java / org / onap / aaf / dao / AbsCassDAO.java
1 /*******************************************************************************\r
2  * ============LICENSE_START====================================================\r
3  * * org.onap.aaf\r
4  * * ===========================================================================\r
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * * ===========================================================================\r
7  * * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * * you may not use this file except in compliance with the License.\r
9  * * You may obtain a copy of the License at\r
10  * * \r
11  *  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  * * \r
13  *  * Unless required by applicable law or agreed to in writing, software\r
14  * * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * * See the License for the specific language governing permissions and\r
17  * * limitations under the License.\r
18  * * ============LICENSE_END====================================================\r
19  * *\r
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
21  * *\r
22  ******************************************************************************/\r
23 package org.onap.aaf.dao;\r
24 \r
25 import java.io.IOException;\r
26 import java.util.ArrayList;\r
27 import java.util.Deque;\r
28 import java.util.List;\r
29 import java.util.concurrent.ConcurrentLinkedDeque;\r
30 \r
31 import org.onap.aaf.authz.layer.Result;\r
32 import org.onap.aaf.dao.aaf.cass.Status;\r
33 \r
34 import org.onap.aaf.inno.env.APIException;\r
35 import org.onap.aaf.inno.env.Env;\r
36 import org.onap.aaf.inno.env.Slot;\r
37 import org.onap.aaf.inno.env.TimeTaken;\r
38 import org.onap.aaf.inno.env.TransStore;\r
39 import com.datastax.driver.core.BoundStatement;\r
40 import com.datastax.driver.core.Cluster;\r
41 import com.datastax.driver.core.ConsistencyLevel;\r
42 import com.datastax.driver.core.ResultSet;\r
43 import com.datastax.driver.core.ResultSetFuture;\r
44 import com.datastax.driver.core.Row;\r
45 import com.datastax.driver.core.Session;\r
46 import com.datastax.driver.core.exceptions.DriverException;\r
47 \r
48 public abstract class AbsCassDAO<TRANS extends TransStore,DATA> {\r
49         protected static final char DOT = '.';\r
50         protected static final char DOT_PLUS_ONE = '.'+1;\r
51         protected static final String FIRST_CHAR = Character.toString((char)0);\r
52         protected static final String LAST_CHAR = Character.toString((char)Character.MAX_VALUE);\r
53         protected static final int FIELD_COMMAS = 0;\r
54         protected static final int QUESTION_COMMAS = 1;\r
55         protected static final int ASSIGNMENT_COMMAS = 2;\r
56         protected static final int WHERE_ANDS = 3;\r
57         \r
58         private Cluster cluster; \r
59         private Session session;\r
60         private final String keyspace;\r
61         // If this is null, then we own session\r
62         private final AbsCassDAO<TRANS,?> owningDAO;\r
63         protected Class<DATA> dataClass;\r
64         private final String name;\r
65         private static Slot sessionSlot;\r
66         //private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<AbsCassDAO<TransStore,?>.PSInfo>();\r
67         private static final ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo> psinfos = new ArrayList<AbsCassDAO<? extends TransStore,?>.PSInfo>();\r
68         private static final List<Object> EMPTY = new ArrayList<Object>(0);\r
69         private static final Deque<ResetRequest> resetDeque = new ConcurrentLinkedDeque<ResetRequest>();\r
70         private static boolean resetTrigger = false;\r
71         private static long nextAvailableReset = 0;\r
72         \r
73 \r
74         public AbsCassDAO(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass) {\r
75                 this.name = name;\r
76                 this.cluster = cluster;\r
77                 this.keyspace = keyspace;\r
78                 owningDAO = null;  // we own session\r
79                 session = null;\r
80                 this.dataClass = dataClass;\r
81                 \r
82         }\r
83 \r
84         public AbsCassDAO(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass) {      \r
85                 this.name = name;\r
86                 cluster = aDao.cluster;\r
87                 keyspace = aDao.keyspace;\r
88                 session = null;\r
89                 owningDAO = aDao; // We do not own session\r
90                 this.dataClass = dataClass;\r
91         }\r
92         \r
93         public static void setSessionSlot(Slot slot) {\r
94                 sessionSlot = slot;\r
95         }\r
96 \r
97         //Note: Lower case ON PURPOSE. These names used to create History Messages\r
98         public enum CRUD {\r
99                 create,read,update,delete\r
100         ;\r
101 \r
102 }\r
103 \r
104         public class PSInfo {\r
105                 private BoundStatement ps;\r
106                 private final int size;\r
107                 private final Loader<DATA> loader;\r
108                 private final CRUD crud; // Store CRUD, because it makes a difference in Object Order, see Loader\r
109                 private final String cql;\r
110                 private final ConsistencyLevel consistency;\r
111 \r
112 \r
113                 /**\r
114                  * Create a PSInfo and create Prepared Statement\r
115                  * \r
116                  * @param trans\r
117                  * @param theCQL\r
118                  * @param loader\r
119                  */\r
120                 public PSInfo(TRANS trans, String theCQL, Loader<DATA> loader, ConsistencyLevel consistency) {\r
121                         this.loader = loader;\r
122                         this.consistency=consistency;\r
123                         psinfos.add(this);\r
124 \r
125                         cql = theCQL.trim().toUpperCase();\r
126                         if(cql.startsWith("INSERT")) {\r
127                                 crud = CRUD.create;\r
128                         } else if(cql.startsWith("UPDATE")) {\r
129                                 crud = CRUD.update;\r
130                         } else if(cql.startsWith("DELETE")) {\r
131                                 crud = CRUD.delete;\r
132                         } else {\r
133                                 crud = CRUD.read;\r
134                         }\r
135                         \r
136                         int idx = 0, count=0;\r
137                         while((idx=cql.indexOf('?',idx))>=0) {\r
138                                 ++idx;\r
139                                 ++count;\r
140                         }\r
141                         size=count;\r
142                 }\r
143                 \r
144                 public synchronized void reset() {\r
145                         ps = null;\r
146                 }\r
147                 \r
148                 private BoundStatement ps(TransStore trans) throws APIException, IOException {\r
149                         if(ps==null) {\r
150                                 synchronized(this) {\r
151                                         if(ps==null) {\r
152                                                 TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB);\r
153                                                 try {\r
154                                                         ps = new BoundStatement(getSession(trans).prepare(cql));\r
155                                                         ps.setConsistencyLevel(consistency);\r
156                                                 } catch (DriverException e) {\r
157                                                         reportPerhapsReset(trans,e);\r
158                                                         throw e;\r
159                                                 } finally {\r
160                                                         tt.done();\r
161                                                 }\r
162                                         }\r
163                                 }\r
164                         }\r
165                         return ps;\r
166                 }\r
167 \r
168                 /**\r
169                  * Execute a Prepared Statement by extracting from DATA object\r
170                  * \r
171                  * @param trans\r
172                  * @param text\r
173                  * @param data\r
174                  * @return\r
175                  */\r
176                 public Result<ResultSetFuture> execAsync(TRANS trans, String text, DATA data) {\r
177                         TimeTaken tt = trans.start(text, Env.REMOTE);\r
178                         try {\r
179                                 return Result.ok(getSession(trans).executeAsync(\r
180                                                 ps(trans).bind(loader.extract(data, size, crud))));\r
181                         } catch (DriverException | APIException | IOException e) {\r
182                                 AbsCassDAO.this.reportPerhapsReset(trans,e);\r
183                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
184                         } finally {\r
185                                 tt.done();\r
186                         }\r
187                 }\r
188 \r
189                 /**\r
190                  * Execute a Prepared Statement on Object[] key\r
191                  * \r
192                  * @param trans\r
193                  * @param text\r
194                  * @param objs\r
195                  * @return\r
196                  */\r
197                 public Result<ResultSetFuture> execAsync(TRANS trans, String text, Object ... objs) {\r
198                         TimeTaken tt = trans.start(text, Env.REMOTE);\r
199                         try {\r
200                                 return Result.ok(getSession(trans).executeAsync(ps(trans).bind(objs)));\r
201                         } catch (DriverException | APIException | IOException e) {\r
202                                 AbsCassDAO.this.reportPerhapsReset(trans,e);\r
203                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
204                         } finally {\r
205                                 tt.done();\r
206                         }\r
207                 }\r
208                 \r
209                 /* \r
210                  * Note:\r
211                  * \r
212                  */\r
213 \r
214                 /**\r
215                  * Execute a Prepared Statement by extracting from DATA object\r
216                  * \r
217                  * @param trans\r
218                  * @param text\r
219                  * @param data\r
220                  * @return\r
221                  */\r
222                 public Result<ResultSet> exec(TRANS trans, String text, DATA data) {\r
223                         TimeTaken tt = trans.start(text, Env.REMOTE);\r
224                         try {\r
225                                 /*\r
226                                  * "execute" (and executeAsync)\r
227                                  * Executes the provided query.\r
228                                         This method blocks until at least some result has been received from the database. However, \r
229                                         for SELECT queries, it does not guarantee that the result has been received in full. But it \r
230                                         does guarantee that some response has been received from the database, and in particular \r
231                                         guarantee that if the request is invalid, an exception will be thrown by this method.\r
232 \r
233                                         Parameters:\r
234                                         statement - the CQL query to execute (that can be any Statement).\r
235                                         Returns:\r
236                                                 the result of the query. That result will never be null but can be empty (and will \r
237                                                 be for any non SELECT query).\r
238                                  */\r
239                                 return Result.ok(getSession(trans).execute(\r
240                                                 ps(trans).bind(loader.extract(data, size, crud))));\r
241                         } catch (DriverException | APIException | IOException e) {\r
242                                 AbsCassDAO.this.reportPerhapsReset(trans,e);\r
243                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
244                         } finally {\r
245                                 tt.done();\r
246                         }\r
247                 }\r
248 \r
249                 /**\r
250                  * Execute a Prepared Statement on Object[] key\r
251                  * \r
252                  * @param trans\r
253                  * @param text\r
254                  * @param objs\r
255                  * @return\r
256                  */\r
257                 public Result<ResultSet> exec(TRANS trans, String text, Object ... objs) {\r
258                         TimeTaken tt = trans.start(text, Env.REMOTE);\r
259                         try {\r
260                                 return Result.ok(getSession(trans).execute(ps(trans).bind(objs)));\r
261                         } catch (DriverException | APIException | IOException e) {\r
262                                 AbsCassDAO.this.reportPerhapsReset(trans,e);\r
263                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
264                         } finally {\r
265                                 tt.done();\r
266                         }\r
267                 }\r
268 \r
269                 /**\r
270                  * Read the Data from Cassandra given a Prepared Statement (defined by the\r
271                  * DAO Instance)\r
272                  *\r
273                  * This is common behavior among all DAOs.\r
274                  * @throws DAOException\r
275                  */\r
276                 public Result<List<DATA>> read(TRANS trans, String text, Object[] key) {\r
277                         TimeTaken tt = trans.start(text,Env.REMOTE);\r
278                         \r
279                         ResultSet rs;\r
280                         try {\r
281                                 rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key));\r
282 /// TEST CODE for Exception                             \r
283 //                              boolean force = true; \r
284 //                              if(force) {\r
285 //                                      Map<InetSocketAddress, Throwable> misa = new HashMap<InetSocketAddress,Throwable>();\r
286 //                                      //misa.put(new InetSocketAddress(444),new Exception("no host was tried"));\r
287 //                                      misa.put(new InetSocketAddress(444),new Exception("Connection has been closed"));\r
288 //                                      throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa);\r
289 ////                                    throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried");\r
290 //                              }\r
291 //// END TEST CODE\r
292                         } catch (DriverException | APIException | IOException e) {\r
293                                 AbsCassDAO.this.reportPerhapsReset(trans,e);\r
294                                 return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql);\r
295                         } finally {\r
296                                 tt.done();\r
297                         }\r
298                         \r
299                         return extract(loader,rs,null /*let Array be created if necessary*/,dflt);\r
300                 }\r
301                 \r
302                 public Result<List<DATA>> read(TRANS trans, String text, DATA data) {\r
303                         return read(trans,text, loader.extract(data, size, crud));\r
304                 }\r
305                 \r
306                 public Object[] keyFrom(DATA data) {\r
307                         return loader.extract(data, size, CRUD.delete); // Delete is key only\r
308                 }\r
309 \r
310                 /*\r
311                  * Note: in case PSInfos are deleted, we want to remove them from list.  This is not expected, \r
312                  * but we don't want a data leak if it does.  Finalize doesn't have to happen quickly\r
313                  */\r
314                 @Override\r
315                 protected void finalize() throws Throwable {\r
316                         psinfos.remove(this);\r
317                 }\r
318         }\r
319 \r
320         protected final Accept<DATA> dflt = new Accept<DATA>() {\r
321                 @Override\r
322                 public boolean ok(DATA data) {\r
323                         return true;\r
324                 }\r
325         };\r
326 \r
327 \r
328         @SuppressWarnings("unchecked")\r
329     protected final Result<List<DATA>> extract(Loader<DATA> loader, ResultSet rs, List<DATA> indata, Accept<DATA> accept) {\r
330                 List<Row> rows = rs.all();\r
331                 if(rows.isEmpty()) {\r
332                         return Result.ok((List<DATA>)EMPTY); // Result sets now .emptyList(true);\r
333                 } else {\r
334                         DATA d;\r
335                         List<DATA> data = indata==null?new ArrayList<DATA>(rows.size()):indata;\r
336                         \r
337                         for(Row row : rows) {\r
338                                 try {\r
339                                         d = loader.load(dataClass.newInstance(),row);\r
340                                         if(accept.ok(d)) {\r
341                                                 data.add(d);\r
342                                         }\r
343                                 } catch(Exception e) {\r
344                                         return Result.err(e);\r
345                                 }\r
346                         }\r
347                         return Result.ok(data);\r
348                 }\r
349     }\r
350     \r
351         private static final String NEW_CASSANDRA_SESSION_CREATED = "New Cassandra Session Created";\r
352         private static final String NEW_CASSANDRA_CLUSTER_OBJECT_CREATED = "New Cassandra Cluster Object Created";\r
353         private static final String NEW_CASSANDRA_SESSION = "New Cassandra Session";\r
354 \r
355         private static class ResetRequest {\r
356                 //package on purpose\r
357                 Session session;\r
358                 long timestamp;\r
359                 \r
360                 public ResetRequest(Session session) {\r
361                         this.session = session;\r
362                         timestamp = System.currentTimeMillis();\r
363                 }\r
364         }\r
365 \r
366         \r
367         public static final void primePSIs(TransStore trans) throws APIException, IOException {\r
368                 for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {\r
369                         if(psi.ps==null) {\r
370                                 psi.ps(trans);\r
371                         }\r
372                 }\r
373         }\r
374         \r
375         public final Session getSession(TransStore trans) throws APIException, IOException {\r
376                 // Try to use Trans' session, if exists\r
377                 if(sessionSlot!=null) { // try to get from Trans\r
378                         Session sess = trans.get(sessionSlot, null);\r
379                         if(sess!=null) {\r
380                                 return sess;\r
381                         }\r
382                 }\r
383                 \r
384                 // If there's an owning DAO, use it's session\r
385                 if(owningDAO!=null) {\r
386                         return owningDAO.getSession(trans);\r
387                 }\r
388                 \r
389                 // OK, nothing else works... get our own.\r
390                 if(session==null || resetTrigger) {\r
391                         Cluster tempCluster = null;\r
392                         Session tempSession = null;\r
393                         try {\r
394                                 synchronized(NEW_CASSANDRA_SESSION_CREATED) {\r
395                                         boolean reset = false;\r
396                                         for(ResetRequest r : resetDeque) {\r
397                                                 if(r.session == session) {\r
398                                                         if(r.timestamp>nextAvailableReset) {\r
399                                                                 reset=true;\r
400                                                                 nextAvailableReset = System.currentTimeMillis() + 60000;\r
401                                                                 tempCluster = cluster;\r
402                                                                 tempSession = session;\r
403                                                                 break;\r
404                                                         } else {\r
405                                                                 trans.warn().log("Cassandra Connection Reset Ignored: Recent Reset");\r
406                                                         }\r
407                                                 }\r
408                                         }\r
409         \r
410                                         if(reset || session == null) {\r
411                                                 TimeTaken tt = trans.start(NEW_CASSANDRA_SESSION, Env.SUB);\r
412                                                 try {\r
413                                                         // Note: Maitrayee recommended not closing the cluster, just\r
414                                                         // overwrite it. 9/30/2016 assuming same for Session\r
415                                                         // This was a bad idea.  Ran out of File Handles as I suspected..\r
416                                                         if(reset) {\r
417                                                                 for(AbsCassDAO<? extends TransStore, ?>.PSInfo psi : psinfos) {\r
418                                                                         psi.reset();\r
419                                                                 }\r
420                                                         }\r
421                                                         if(reset || cluster==null) {\r
422                                                                 cluster = CassAccess.cluster(trans, keyspace);\r
423                                                                 trans.warn().log(NEW_CASSANDRA_CLUSTER_OBJECT_CREATED);\r
424                                                         }\r
425                                                         if(reset || session==null) {\r
426                                                                 session = cluster.connect(keyspace);\r
427                                                                 trans.warn().log(NEW_CASSANDRA_SESSION_CREATED);\r
428                                                         }\r
429                                                 } finally {\r
430                                                         resetTrigger=false;\r
431                                                         tt.done();\r
432                                                 }\r
433                                         }\r
434                                 }\r
435                         } finally {\r
436                                 TimeTaken tt = trans.start("Clear Reset Deque", Env.SUB);\r
437                                 try {\r
438                                         resetDeque.clear();\r
439                                         // Not clearing Session/Cluster appears to kill off FileHandles\r
440                                         if(tempSession!=null && !tempSession.isClosed()) {\r
441                                                 tempSession.close();\r
442                                         }\r
443                                         if(tempCluster!=null && !tempCluster.isClosed()) {\r
444                                                 tempCluster.close();\r
445                                         }\r
446                                 } finally {\r
447                                         tt.done();\r
448                                 }\r
449                         }\r
450                 }\r
451                 return session;\r
452         }\r
453         \r
454         public final boolean reportPerhapsReset(TransStore trans, Exception e) {\r
455                 if(owningDAO!=null) {\r
456                         return owningDAO.reportPerhapsReset(trans, e);\r
457                 } else {\r
458                         boolean rv = false;\r
459                         if(CassAccess.isResetException(e)) {\r
460                                 trans.warn().printf("Session Reset called for %s by %s ",session==null?"":session,e==null?"Mgmt Command":e.getClass().getName());\r
461                                 resetDeque.addFirst(new ResetRequest(session));\r
462                                 rv = resetTrigger = true;\r
463                         } \r
464                         trans.error().log(e);\r
465                         return rv;\r
466                 }\r
467         }\r
468 \r
469         public void close(TransStore trans) {\r
470                 if(owningDAO==null) {\r
471                         if(session!=null) {\r
472                                 TimeTaken tt = trans.start("Cassandra Session Close", Env.SUB);\r
473                                 try {\r
474                                         session.close();\r
475                                 } finally {\r
476                                         tt.done();\r
477                                 }\r
478                                 session = null;\r
479                         } else {\r
480                                 trans.debug().log("close called(), Session already closed");\r
481                         }\r
482                 } else {\r
483                         owningDAO.close(trans);\r
484                 }\r
485         }\r
486 \r
487         protected void wasModified(TRANS trans, CRUD modified, DATA data, String ... override) {\r
488         }\r
489         \r
490         protected interface Accept<DATA> {\r
491                 public boolean ok(DATA data);\r
492         }\r
493 \r
494 }\r
495 \r
496 \r
497 \r