AT&T 2.0.19 Code drop, stage 4
[aaf/authz.git] / authz-cass / src / main / java / org / onap / aaf / dao / aaf / cass / CacheInfoDAO.java
1 /*******************************************************************************\r
2  * ============LICENSE_START====================================================\r
3  * * org.onap.aaf\r
4  * * ===========================================================================\r
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * * ===========================================================================\r
7  * * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * * you may not use this file except in compliance with the License.\r
9  * * You may obtain a copy of the License at\r
10  * * \r
11  *  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  * * \r
13  *  * Unless required by applicable law or agreed to in writing, software\r
14  * * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * * See the License for the specific language governing permissions and\r
17  * * limitations under the License.\r
18  * * ============LICENSE_END====================================================\r
19  * *\r
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
21  * *\r
22  ******************************************************************************/\r
23 package org.onap.aaf.dao.aaf.cass;\r
24 \r
25 import java.io.IOException;\r
26 import java.net.HttpURLConnection;\r
27 import java.net.URI;\r
28 import java.util.Date;\r
29 import java.util.HashMap;\r
30 import java.util.HashSet;\r
31 import java.util.Map;\r
32 import java.util.Map.Entry;\r
33 import java.util.concurrent.BlockingQueue;\r
34 import java.util.concurrent.ConcurrentHashMap;\r
35 import java.util.concurrent.LinkedBlockingQueue;\r
36 import java.util.concurrent.TimeUnit;\r
37 \r
38 import org.onap.aaf.authz.env.AuthzEnv;\r
39 import org.onap.aaf.authz.env.AuthzTrans;\r
40 import org.onap.aaf.authz.layer.Result;\r
41 import org.onap.aaf.dao.AbsCassDAO;\r
42 import org.onap.aaf.dao.CIDAO;\r
43 import org.onap.aaf.dao.CassAccess;\r
44 import org.onap.aaf.dao.CassDAOImpl;\r
45 import org.onap.aaf.dao.Loader;\r
46 \r
47 import org.onap.aaf.cadi.CadiException;\r
48 import org.onap.aaf.cadi.SecuritySetter;\r
49 import org.onap.aaf.cadi.client.Future;\r
50 import org.onap.aaf.cadi.client.Rcli;\r
51 import org.onap.aaf.cadi.client.Retryable;\r
52 import org.onap.aaf.cadi.http.HMangr;\r
53 import org.onap.aaf.inno.env.APIException;\r
54 import org.onap.aaf.inno.env.Env;\r
55 import org.onap.aaf.inno.env.TimeTaken;\r
56 import org.onap.aaf.inno.env.Trans;\r
57 import com.datastax.driver.core.BoundStatement;\r
58 import com.datastax.driver.core.Cluster;\r
59 import com.datastax.driver.core.ResultSet;\r
60 import com.datastax.driver.core.Row;\r
61 import com.datastax.driver.core.exceptions.DriverException;\r
62 \r
63 public class CacheInfoDAO extends CassDAOImpl<AuthzTrans,CacheInfoDAO.Data> implements CIDAO<AuthzTrans> {\r
64 \r
65         private static final String TABLE = "cache";\r
66         public static final Map<String,Date[]> info = new ConcurrentHashMap<String,Date[]>();\r
67 \r
68         private static CacheUpdate cacheUpdate;\r
69         \r
70         \r
71         private BoundStatement check;\r
72         // Hold current time stamps from Tables\r
73         private final Date startTime;\r
74         \r
75         public CacheInfoDAO(AuthzTrans trans, Cluster cluster, String keyspace) throws APIException, IOException {\r
76                 super(trans, CacheInfoDAO.class.getSimpleName(),cluster,keyspace,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE));\r
77                 startTime = new Date();\r
78                 init(trans);\r
79         }\r
80 \r
81         public CacheInfoDAO(AuthzTrans trans, AbsCassDAO<AuthzTrans,?> aDao) throws APIException, IOException {\r
82                 super(trans, CacheInfoDAO.class.getSimpleName(),aDao,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE));\r
83                 startTime = new Date();\r
84                 init(trans);\r
85         }\r
86 \r
87 \r
88     //////////////////////////////////////////\r
89     // Data Definition, matches Cassandra DM\r
90     //////////////////////////////////////////\r
91     private static final int KEYLIMIT = 2;\r
92         /**\r
93      */\r
94         public static class Data {\r
95                 public Data() {\r
96                         name = null;\r
97                         touched = null;\r
98                 }\r
99                 public Data(String name, int seg) {\r
100                         this.name = name;\r
101                         this.seg = seg;\r
102                         touched = null;\r
103                 }\r
104                 \r
105                 public String           name;\r
106                 public int                      seg;\r
107                 public Date                     touched;\r
108     }\r
109 \r
110     private static class InfoLoader extends Loader<Data> {\r
111         public static final InfoLoader dflt = new InfoLoader(KEYLIMIT);\r
112         \r
113                 public InfoLoader(int keylimit) {\r
114                         super(keylimit);\r
115                 }\r
116                 \r
117                 @Override\r
118                 public Data load(Data data, Row row) {\r
119                         // Int more efficient\r
120                         data.name = row.getString(0);\r
121                         data.seg = row.getInt(1);\r
122                         data.touched = row.getDate(2);\r
123                         return data;\r
124                 }\r
125 \r
126                 @Override\r
127                 protected void key(Data data, int _idx, Object[] obj) {\r
128                         int idx = _idx;\r
129 \r
130                         obj[idx]=data.name;\r
131                         obj[++idx]=data.seg;\r
132                 }\r
133 \r
134                 @Override\r
135                 protected void body(Data data, int idx, Object[] obj) {\r
136                         obj[idx]=data.touched;\r
137                 }\r
138     }\r
139     \r
140         public static<T extends Trans> void startUpdate(AuthzEnv env, HMangr hman, SecuritySetter<HttpURLConnection> ss, String ip, int port) {\r
141                 if(cacheUpdate==null) {\r
142                         Thread t= new Thread(cacheUpdate = new CacheUpdate(env,hman,ss, ip,port),"CacheInfo Update Thread");\r
143                         t.setDaemon(true);\r
144                         t.start();\r
145                 }\r
146         }\r
147 \r
148         public static<T extends Trans> void stopUpdate() {\r
149                 if(cacheUpdate!=null) {\r
150                         cacheUpdate.go=false;\r
151                 }\r
152         }\r
153 \r
154         private final static class CacheUpdate extends Thread {\r
155                 public static BlockingQueue<Transfer> notifyDQ = new LinkedBlockingQueue<Transfer>(2000);\r
156 \r
157                 private static final String VOID_CT="application/Void+json;q=1.0;charset=utf-8;version=2.0,application/json;q=1.0;version=2.0,*/*;q=1.0";\r
158                 private AuthzEnv env;\r
159                 private HMangr hman;\r
160                 private SecuritySetter<HttpURLConnection> ss;\r
161                 private final String authority;\r
162                 public boolean go = true;\r
163                 \r
164                 public CacheUpdate(AuthzEnv env, HMangr hman, SecuritySetter<HttpURLConnection> ss, String ip, int port) {\r
165                         this.env = env;\r
166                         this.hman = hman;\r
167                         this.ss = ss;\r
168                         \r
169                         this.authority = ip+':'+port;\r
170                 }\r
171                 \r
172                 private static class Transfer {\r
173                         public String table;\r
174                         public int segs[];\r
175                         public Transfer(String table, int[] segs)  {\r
176                                 this.table = table;\r
177                                 this.segs = segs;\r
178                         }\r
179                 }\r
180                 private class CacheClear extends Retryable<Integer> {\r
181                         public int total=0;\r
182                         private AuthzTrans trans;\r
183                         private String type;\r
184                         private String segs;\r
185                         \r
186                         public CacheClear(AuthzTrans trans) {\r
187                                 this.trans = trans;\r
188                         }\r
189 \r
190                         public void set(Entry<String, IntHolder> es) {\r
191                                 type = es.getKey();\r
192                                 segs = es.getValue().toString();\r
193                         }\r
194                         \r
195                 @Override\r
196                         public Integer code(Rcli<?> client) throws APIException, CadiException {\r
197                                 URI to = client.getURI();\r
198                                 if(!to.getAuthority().equals(authority)) {\r
199                                         Future<Void> f = client.delete("/mgmt/cache/"+type+'/'+segs,VOID_CT);\r
200                                         if(f.get(hman.readTimeout())) {\r
201                                             ++total;\r
202                                         } else {\r
203                                             trans.error().log("Error During AAF Peer Notify",f.code(),f.body());\r
204                                         }\r
205                                 }\r
206                                 return total;\r
207                         }\r
208                 }\r
209                 \r
210                 private class IntHolder {\r
211                         private int[] raw;\r
212                         HashSet<Integer> set;\r
213                         \r
214                         public IntHolder(int ints[]) {\r
215                                 raw = ints;\r
216                                 set = null;\r
217                         }\r
218                         public void add(int[] ints) {\r
219                                 if(set==null) {\r
220                                         set = new HashSet<Integer>();\r
221                                         \r
222                                         for(int i=0;i<raw.length;++i) {\r
223                                                 set.add(raw[i]);\r
224                                         }\r
225                                 }\r
226                                 for(int i=0;i<ints.length;++i) {\r
227                                         set.add(ints[i]);\r
228                                 }\r
229                         }\r
230 \r
231                         @Override\r
232                         public String toString() {\r
233                                 StringBuilder sb = new StringBuilder();\r
234                                 boolean first = true;\r
235                                 if(set==null) {\r
236                                         for(int i : raw) {\r
237                                                 if(first) {\r
238                                                         first=false;\r
239                                                 } else {\r
240                                                         sb.append(',');\r
241                                                 }\r
242                                                 sb.append(i);\r
243                                         }\r
244                                 } else {\r
245                                         for(Integer i : set) {\r
246                                                 if(first) {\r
247                                                         first=false;\r
248                                                 } else {\r
249                                                         sb.append(',');\r
250                                                 }\r
251                                                 sb.append(i);\r
252                                         }\r
253                                 }\r
254                                 return sb.toString();\r
255                         }\r
256                 }\r
257                 \r
258                 @Override\r
259                 public void run() {\r
260                         do {\r
261                                 try {\r
262                                         Transfer data = notifyDQ.poll(4,TimeUnit.SECONDS);\r
263                                         if(data==null) {\r
264                                                 continue;\r
265                                         }\r
266                                         \r
267                                         int count = 0;\r
268                                         CacheClear cc = null;\r
269                                         Map<String,IntHolder> gather = null;\r
270                                         AuthzTrans trans = null;\r
271                                         long start=0;\r
272                                         // Do a block poll first\r
273                                         do {\r
274                                                 if(gather==null) {\r
275                                                         start = System.nanoTime();\r
276                                                         trans = env.newTransNoAvg();\r
277                                                         cc = new CacheClear(trans);\r
278                                                         gather = new HashMap<String,IntHolder>();\r
279                                                 }\r
280                                                 IntHolder prev = gather.get(data.table);\r
281                                                 if(prev==null) {\r
282                                                         gather.put(data.table,new IntHolder(data.segs));\r
283                                                 } else {\r
284                                                         prev.add(data.segs);\r
285                                                 }\r
286                                                 // continue while there is data\r
287                                         } while((data = notifyDQ.poll())!=null);\r
288                                         if(gather!=null) {\r
289                                                 for(Entry<String, IntHolder> es : gather.entrySet()) {\r
290                                                         cc.set(es);\r
291                                                         try {\r
292                                                                 if(hman.all(ss, cc, false)!=null) {\r
293                                                                         ++count;\r
294                                                                 }\r
295                                                         } catch (Exception e) {\r
296                                                                 trans.error().log(e, "Error on Cache Update");\r
297                                                         }\r
298                                                 }\r
299                                                 if(env.debug().isLoggable()) {\r
300                                                         float millis = (System.nanoTime()-start)/1000000f;\r
301                                                         StringBuilder sb = new StringBuilder("Direct Cache Refresh: ");\r
302                                                         sb.append("Updated ");\r
303                                                         sb.append(count);\r
304                                                         if(count==1) {\r
305                                                                 sb.append(" entry for ");\r
306                                                         } else { \r
307                                                                 sb.append(" entries for ");\r
308                                                         }\r
309                                                         int peers = count<=0?0:cc.total/count;\r
310                                                         sb.append(peers);\r
311                                                         sb.append(" client");\r
312                                                         if(peers!=1) {\r
313                                                                 sb.append('s');\r
314                                                         }\r
315                                                         sb.append(" in ");\r
316                                                         sb.append(millis);\r
317                                                         sb.append("ms");\r
318                                                         trans.auditTrail(0, sb, Env.REMOTE);\r
319                                                         env.debug().log(sb);\r
320                                                 }\r
321                                         }\r
322                                 } catch (InterruptedException e1) {\r
323                                         go = false;\r
324                                 }\r
325                         } while(go);\r
326                 }\r
327         }\r
328 \r
329         private void init(AuthzTrans trans) throws APIException, IOException {\r
330                 \r
331                 String[] helpers = setCRUD(trans, TABLE, Data.class, InfoLoader.dflt);\r
332                 check = getSession(trans).prepare(SELECT_SP +  helpers[FIELD_COMMAS] + " FROM " + TABLE).bind();\r
333 \r
334                 disable(CRUD.create);\r
335                 disable(CRUD.delete);\r
336         }\r
337 \r
338         /* (non-Javadoc)\r
339          * @see org.onap.aaf.dao.aaf.cass.CIDAO#touch(org.onap.aaf.authz.env.AuthzTrans, java.lang.String, int)\r
340          */\r
341         \r
342         @Override\r
343         public Result<Void> touch(AuthzTrans trans, String name, int ... seg) {\r
344                 /////////////\r
345                 // Direct Service Cache Invalidation\r
346                 /////////////\r
347                 // ConcurrentQueues are open-ended.  We don't want any Memory leaks \r
348                 // Note: we keep a separate counter, because "size()" on a Linked Queue is expensive\r
349                 if(cacheUpdate!=null) {\r
350                         try {\r
351                                 if(!CacheUpdate.notifyDQ.offer(new CacheUpdate.Transfer(name, seg),2,TimeUnit.SECONDS)) {\r
352                                         trans.error().log("Cache Notify Queue is not accepting messages, bouncing may be appropriate" );\r
353                                 }\r
354                         } catch (InterruptedException e) {\r
355                                 trans.error().log("Cache Notify Queue posting was interrupted" );\r
356                         }\r
357                 }\r
358 \r
359                 /////////////\r
360                 // Table Based Cache Invalidation (original)\r
361                 /////////////\r
362                 // Note: Save time with multiple Sequence Touches, but PreparedStmt doesn't support IN\r
363                 StringBuilder start = new StringBuilder("CacheInfoDAO Touch segments ");\r
364                 start.append(name);\r
365                 start.append(": ");\r
366                 StringBuilder sb = new StringBuilder("BEGIN BATCH\n");\r
367                 boolean first = true;\r
368                 for(int s : seg) {\r
369                         sb.append(UPDATE_SP);\r
370                         sb.append(TABLE);\r
371                         sb.append(" SET touched=dateof(now()) WHERE name = '");\r
372                         sb.append(name);\r
373                         sb.append("' AND seg = ");\r
374                         sb.append(s);\r
375                         sb.append(";\n");       \r
376                         if(first) {\r
377                                 first =false;\r
378                         } else {\r
379                                 start.append(',');\r
380                         }\r
381                         start.append(s);\r
382                 }\r
383                 sb.append("APPLY BATCH;");\r
384                 TimeTaken tt = trans.start(start.toString(),Env.REMOTE);\r
385                 try {\r
386                         getSession(trans).executeAsync(sb.toString());\r
387                 } catch (DriverException | APIException | IOException e) {\r
388                         reportPerhapsReset(trans,e);\r
389                         return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG);\r
390                 } finally {\r
391                         tt.done();\r
392                 }\r
393                 return Result.ok();\r
394         }\r
395 \r
396         /* (non-Javadoc)\r
397          * @see org.onap.aaf.dao.aaf.cass.CIDAO#check(org.onap.aaf.authz.env.AuthzTrans)\r
398          */\r
399         @Override\r
400         public Result<Void> check(AuthzTrans trans) {\r
401                 ResultSet rs;\r
402                 TimeTaken tt = trans.start("Check Table Timestamps",Env.REMOTE);\r
403                 try {\r
404                         rs = getSession(trans).execute(check);\r
405                 } catch (DriverException | APIException | IOException e) {\r
406                         reportPerhapsReset(trans,e);\r
407                         return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG);\r
408                 } finally {\r
409                         tt.done();\r
410                 }\r
411                 \r
412                 String lastName = null;\r
413                 Date[] dates = null;\r
414                 for(Row row : rs.all()) {\r
415                         String name = row.getString(0);\r
416                         int seg = row.getInt(1);\r
417                         if(!name.equals(lastName)) {\r
418                                 dates = info.get(name);\r
419                                 lastName=name;\r
420                         }\r
421                         if(dates==null) {\r
422                                 dates=new Date[seg+1];\r
423                                 info.put(name,dates);\r
424                         } else if(dates.length<=seg) {\r
425                                 Date[] temp = new Date[seg+1];\r
426                                 System.arraycopy(dates, 0, temp, 0, dates.length);\r
427                                 dates = temp;\r
428                                 info.put(name, dates);\r
429                         }\r
430                         Date temp = row.getDate(2);\r
431                         if(dates[seg]==null || dates[seg].before(temp)) {\r
432                                 dates[seg]=temp;\r
433                         }\r
434                 }\r
435                 return Result.ok();\r
436         }\r
437         \r
438     /* (non-Javadoc)\r
439          * @see org.onap.aaf.dao.aaf.cass.CIDAO#get(java.lang.String, int)\r
440          */\r
441     @Override\r
442         public Date get(AuthzTrans trans, String table, int seg) {\r
443                 Date[] dates = info.get(table);\r
444                 if(dates==null) {\r
445                         dates = new Date[seg+1];\r
446                         touch(trans,table, seg);\r
447                 } else if(dates.length<=seg) {\r
448                         Date[] temp = new Date[seg+1];\r
449                         System.arraycopy(dates, 0, temp, 0, dates.length);\r
450                         dates = temp;\r
451                 }\r
452                 Date rv = dates[seg];\r
453                 if(rv==null) {\r
454                         rv=dates[seg]=startTime;\r
455                 }\r
456                 return rv;\r
457         }\r
458 \r
459         @Override\r
460         protected void wasModified(AuthzTrans trans, CRUD modified, Data data, String ... override) {\r
461                 // Do nothing\r
462         }\r
463 \r
464 }\r