[AAF-21] Updated Copyright Headers for AAF
[aaf/authz.git] / authz-cass / src / main / java / com / att / dao / aaf / cass / CacheInfoDAO.java
1 /*******************************************************************************\r
2  * ============LICENSE_START====================================================\r
3  * * org.onap.aaf\r
4  * * ===========================================================================\r
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
6  * * ===========================================================================\r
7  * * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * * you may not use this file except in compliance with the License.\r
9  * * You may obtain a copy of the License at\r
10  * * \r
11  *  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  * * \r
13  *  * Unless required by applicable law or agreed to in writing, software\r
14  * * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * * See the License for the specific language governing permissions and\r
17  * * limitations under the License.\r
18  * * ============LICENSE_END====================================================\r
19  * *\r
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
21  * *\r
22  ******************************************************************************/\r
23 package com.att.dao.aaf.cass;\r
24 \r
25 import java.io.IOException;\r
26 import java.net.HttpURLConnection;\r
27 import java.net.URI;\r
28 import java.util.Date;\r
29 import java.util.HashMap;\r
30 import java.util.HashSet;\r
31 import java.util.Map;\r
32 import java.util.Map.Entry;\r
33 import java.util.concurrent.BlockingQueue;\r
34 import java.util.concurrent.ConcurrentHashMap;\r
35 import java.util.concurrent.LinkedBlockingQueue;\r
36 import java.util.concurrent.TimeUnit;\r
37 \r
38 import com.att.authz.env.AuthzEnv;\r
39 import com.att.authz.env.AuthzTrans;\r
40 import com.att.authz.layer.Result;\r
41 import com.att.cadi.CadiException;\r
42 import com.att.cadi.SecuritySetter;\r
43 import com.att.cadi.client.Future;\r
44 import com.att.cadi.client.Rcli;\r
45 import com.att.cadi.client.Retryable;\r
46 import com.att.cadi.http.HMangr;\r
47 import com.att.dao.AbsCassDAO;\r
48 import com.att.dao.CIDAO;\r
49 import com.att.dao.CassAccess;\r
50 import com.att.dao.CassDAOImpl;\r
51 import com.att.dao.Loader;\r
52 import com.att.inno.env.APIException;\r
53 import com.att.inno.env.Env;\r
54 import com.att.inno.env.TimeTaken;\r
55 import com.att.inno.env.Trans;\r
56 import com.datastax.driver.core.BoundStatement;\r
57 import com.datastax.driver.core.Cluster;\r
58 import com.datastax.driver.core.ResultSet;\r
59 import com.datastax.driver.core.Row;\r
60 import com.datastax.driver.core.exceptions.DriverException;\r
61 \r
62 public class CacheInfoDAO extends CassDAOImpl<AuthzTrans,CacheInfoDAO.Data> implements CIDAO<AuthzTrans> {\r
63 \r
64         private static final String TABLE = "cache";\r
65         public static final Map<String,Date[]> info = new ConcurrentHashMap<String,Date[]>();\r
66 \r
67         private static CacheUpdate cacheUpdate;\r
68         \r
69         \r
70         private BoundStatement check;\r
71         // Hold current time stamps from Tables\r
72         private final Date startTime;\r
73         \r
74         public CacheInfoDAO(AuthzTrans trans, Cluster cluster, String keyspace) throws APIException, IOException {\r
75                 super(trans, CacheInfoDAO.class.getSimpleName(),cluster,keyspace,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE));\r
76                 startTime = new Date();\r
77                 init(trans);\r
78         }\r
79 \r
80         public CacheInfoDAO(AuthzTrans trans, AbsCassDAO<AuthzTrans,?> aDao) throws APIException, IOException {\r
81                 super(trans, CacheInfoDAO.class.getSimpleName(),aDao,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE));\r
82                 startTime = new Date();\r
83                 init(trans);\r
84         }\r
85 \r
86 \r
87     //////////////////////////////////////////\r
88     // Data Definition, matches Cassandra DM\r
89     //////////////////////////////////////////\r
90     private static final int KEYLIMIT = 2;\r
91         /**\r
92      */\r
93         public static class Data {\r
94                 public Data() {\r
95                         name = null;\r
96                         touched = null;\r
97                 }\r
98                 public Data(String name, int seg) {\r
99                         this.name = name;\r
100                         this.seg = seg;\r
101                         touched = null;\r
102                 }\r
103                 \r
104                 public String           name;\r
105                 public int                      seg;\r
106                 public Date                     touched;\r
107     }\r
108 \r
109     private static class InfoLoader extends Loader<Data> {\r
110         public static final InfoLoader dflt = new InfoLoader(KEYLIMIT);\r
111         \r
112                 public InfoLoader(int keylimit) {\r
113                         super(keylimit);\r
114                 }\r
115                 \r
116                 @Override\r
117                 public Data load(Data data, Row row) {\r
118                         // Int more efficient\r
119                         data.name = row.getString(0);\r
120                         data.seg = row.getInt(1);\r
121                         data.touched = row.getDate(2);\r
122                         return data;\r
123                 }\r
124 \r
125                 @Override\r
126                 protected void key(Data data, int _idx, Object[] obj) {\r
127                         int idx = _idx;\r
128 \r
129                         obj[idx]=data.name;\r
130                         obj[++idx]=data.seg;\r
131                 }\r
132 \r
133                 @Override\r
134                 protected void body(Data data, int idx, Object[] obj) {\r
135                         obj[idx]=data.touched;\r
136                 }\r
137     }\r
138     \r
139         public static<T extends Trans> void startUpdate(AuthzEnv env, HMangr hman, SecuritySetter<HttpURLConnection> ss, String ip, int port) {\r
140                 if(cacheUpdate==null) {\r
141                         Thread t= new Thread(cacheUpdate = new CacheUpdate(env,hman,ss, ip,port),"CacheInfo Update Thread");\r
142                         t.setDaemon(true);\r
143                         t.start();\r
144                 }\r
145         }\r
146 \r
147         public static<T extends Trans> void stopUpdate() {\r
148                 if(cacheUpdate!=null) {\r
149                         cacheUpdate.go=false;\r
150                 }\r
151         }\r
152 \r
153         private final static class CacheUpdate extends Thread {\r
154                 public static BlockingQueue<Transfer> notifyDQ = new LinkedBlockingQueue<Transfer>(2000);\r
155 \r
156                 private static final String VOID_CT="application/Void+json;q=1.0;charset=utf-8;version=2.0,application/json;q=1.0;version=2.0,*/*;q=1.0";\r
157                 private AuthzEnv env;\r
158                 private HMangr hman;\r
159                 private SecuritySetter<HttpURLConnection> ss;\r
160                 private final String authority;\r
161                 public boolean go = true;\r
162                 \r
163                 public CacheUpdate(AuthzEnv env, HMangr hman, SecuritySetter<HttpURLConnection> ss, String ip, int port) {\r
164                         this.env = env;\r
165                         this.hman = hman;\r
166                         this.ss = ss;\r
167                         \r
168                         this.authority = ip+':'+port;\r
169                 }\r
170                 \r
171                 private static class Transfer {\r
172                         public String table;\r
173                         public int segs[];\r
174                         public Transfer(String table, int[] segs)  {\r
175                                 this.table = table;\r
176                                 this.segs = segs;\r
177                         }\r
178                 }\r
179                 private class CacheClear extends Retryable<Integer> {\r
180                         public int total=0;\r
181                         private AuthzTrans trans;\r
182                         private String type;\r
183                         private String segs;\r
184                         \r
185                         public CacheClear(AuthzTrans trans) {\r
186                                 this.trans = trans;\r
187                         }\r
188 \r
189                         public void set(Entry<String, IntHolder> es) {\r
190                                 type = es.getKey();\r
191                                 segs = es.getValue().toString();\r
192                         }\r
193                         \r
194                 @Override\r
195                         public Integer code(Rcli<?> client) throws APIException, CadiException {\r
196                                 URI to = client.getURI();\r
197                                 if(!to.getAuthority().equals(authority)) {\r
198                                         Future<Void> f = client.delete("/mgmt/cache/"+type+'/'+segs,VOID_CT);\r
199                                         if(f.get(hman.readTimeout())) {\r
200                                             ++total;\r
201                                         } else {\r
202                                             trans.error().log("Error During AAF Peer Notify",f.code(),f.body());\r
203                                         }\r
204                                 }\r
205                                 return total;\r
206                         }\r
207                 }\r
208                 \r
209                 private class IntHolder {\r
210                         private int[] raw;\r
211                         HashSet<Integer> set;\r
212                         \r
213                         public IntHolder(int ints[]) {\r
214                                 raw = ints;\r
215                                 set = null;\r
216                         }\r
217                         public void add(int[] ints) {\r
218                                 if(set==null) {\r
219                                         set = new HashSet<Integer>();\r
220                                         \r
221                                         for(int i=0;i<raw.length;++i) {\r
222                                                 set.add(raw[i]);\r
223                                         }\r
224                                 }\r
225                                 for(int i=0;i<ints.length;++i) {\r
226                                         set.add(ints[i]);\r
227                                 }\r
228                         }\r
229 \r
230                         @Override\r
231                         public String toString() {\r
232                                 StringBuilder sb = new StringBuilder();\r
233                                 boolean first = true;\r
234                                 if(set==null) {\r
235                                         for(int i : raw) {\r
236                                                 if(first) {\r
237                                                         first=false;\r
238                                                 } else {\r
239                                                         sb.append(',');\r
240                                                 }\r
241                                                 sb.append(i);\r
242                                         }\r
243                                 } else {\r
244                                         for(Integer i : set) {\r
245                                                 if(first) {\r
246                                                         first=false;\r
247                                                 } else {\r
248                                                         sb.append(',');\r
249                                                 }\r
250                                                 sb.append(i);\r
251                                         }\r
252                                 }\r
253                                 return sb.toString();\r
254                         }\r
255                 }\r
256                 \r
257                 @Override\r
258                 public void run() {\r
259                         do {\r
260                                 try {\r
261                                         Transfer data = notifyDQ.poll(4,TimeUnit.SECONDS);\r
262                                         if(data==null) {\r
263                                                 continue;\r
264                                         }\r
265                                         \r
266                                         int count = 0;\r
267                                         CacheClear cc = null;\r
268                                         Map<String,IntHolder> gather = null;\r
269                                         AuthzTrans trans = null;\r
270                                         long start=0;\r
271                                         // Do a block poll first\r
272                                         do {\r
273                                                 if(gather==null) {\r
274                                                         start = System.nanoTime();\r
275                                                         trans = env.newTransNoAvg();\r
276                                                         cc = new CacheClear(trans);\r
277                                                         gather = new HashMap<String,IntHolder>();\r
278                                                 }\r
279                                                 IntHolder prev = gather.get(data.table);\r
280                                                 if(prev==null) {\r
281                                                         gather.put(data.table,new IntHolder(data.segs));\r
282                                                 } else {\r
283                                                         prev.add(data.segs);\r
284                                                 }\r
285                                                 // continue while there is data\r
286                                         } while((data = notifyDQ.poll())!=null);\r
287                                         if(gather!=null) {\r
288                                                 for(Entry<String, IntHolder> es : gather.entrySet()) {\r
289                                                         cc.set(es);\r
290                                                         try {\r
291                                                                 if(hman.all(ss, cc, false)!=null) {\r
292                                                                         ++count;\r
293                                                                 }\r
294                                                         } catch (Exception e) {\r
295                                                                 trans.error().log(e, "Error on Cache Update");\r
296                                                         }\r
297                                                 }\r
298                                                 if(env.debug().isLoggable()) {\r
299                                                         float millis = (System.nanoTime()-start)/1000000f;\r
300                                                         StringBuilder sb = new StringBuilder("Direct Cache Refresh: ");\r
301                                                         sb.append("Updated ");\r
302                                                         sb.append(count);\r
303                                                         if(count==1) {\r
304                                                                 sb.append(" entry for ");\r
305                                                         } else { \r
306                                                                 sb.append(" entries for ");\r
307                                                         }\r
308                                                         int peers = count<=0?0:cc.total/count;\r
309                                                         sb.append(peers);\r
310                                                         sb.append(" client");\r
311                                                         if(peers!=1) {\r
312                                                                 sb.append('s');\r
313                                                         }\r
314                                                         sb.append(" in ");\r
315                                                         sb.append(millis);\r
316                                                         sb.append("ms");\r
317                                                         trans.auditTrail(0, sb, Env.REMOTE);\r
318                                                         env.debug().log(sb);\r
319                                                 }\r
320                                         }\r
321                                 } catch (InterruptedException e1) {\r
322                                         go = false;\r
323                                 }\r
324                         } while(go);\r
325                 }\r
326         }\r
327 \r
328         private void init(AuthzTrans trans) throws APIException, IOException {\r
329                 \r
330                 String[] helpers = setCRUD(trans, TABLE, Data.class, InfoLoader.dflt);\r
331                 check = getSession(trans).prepare(SELECT_SP +  helpers[FIELD_COMMAS] + " FROM " + TABLE).bind();\r
332 \r
333                 disable(CRUD.create);\r
334                 disable(CRUD.delete);\r
335         }\r
336 \r
337         /* (non-Javadoc)\r
338          * @see com.att.dao.aaf.cass.CIDAO#touch(com.att.authz.env.AuthzTrans, java.lang.String, int)\r
339          */\r
340         \r
341         @Override\r
342         public Result<Void> touch(AuthzTrans trans, String name, int ... seg) {\r
343                 /////////////\r
344                 // Direct Service Cache Invalidation\r
345                 /////////////\r
346                 // ConcurrentQueues are open-ended.  We don't want any Memory leaks \r
347                 // Note: we keep a separate counter, because "size()" on a Linked Queue is expensive\r
348                 if(cacheUpdate!=null) {\r
349                         try {\r
350                                 if(!CacheUpdate.notifyDQ.offer(new CacheUpdate.Transfer(name, seg),2,TimeUnit.SECONDS)) {\r
351                                         trans.error().log("Cache Notify Queue is not accepting messages, bouncing may be appropriate" );\r
352                                 }\r
353                         } catch (InterruptedException e) {\r
354                                 trans.error().log("Cache Notify Queue posting was interrupted" );\r
355                         }\r
356                 }\r
357 \r
358                 /////////////\r
359                 // Table Based Cache Invalidation (original)\r
360                 /////////////\r
361                 // Note: Save time with multiple Sequence Touches, but PreparedStmt doesn't support IN\r
362                 StringBuilder start = new StringBuilder("CacheInfoDAO Touch segments ");\r
363                 start.append(name);\r
364                 start.append(": ");\r
365                 StringBuilder sb = new StringBuilder("BEGIN BATCH\n");\r
366                 boolean first = true;\r
367                 for(int s : seg) {\r
368                         sb.append(UPDATE_SP);\r
369                         sb.append(TABLE);\r
370                         sb.append(" SET touched=dateof(now()) WHERE name = '");\r
371                         sb.append(name);\r
372                         sb.append("' AND seg = ");\r
373                         sb.append(s);\r
374                         sb.append(";\n");       \r
375                         if(first) {\r
376                                 first =false;\r
377                         } else {\r
378                                 start.append(',');\r
379                         }\r
380                         start.append(s);\r
381                 }\r
382                 sb.append("APPLY BATCH;");\r
383                 TimeTaken tt = trans.start(start.toString(),Env.REMOTE);\r
384                 try {\r
385                         getSession(trans).executeAsync(sb.toString());\r
386                 } catch (DriverException | APIException | IOException e) {\r
387                         reportPerhapsReset(trans,e);\r
388                         return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG);\r
389                 } finally {\r
390                         tt.done();\r
391                 }\r
392                 return Result.ok();\r
393         }\r
394 \r
395         /* (non-Javadoc)\r
396          * @see com.att.dao.aaf.cass.CIDAO#check(com.att.authz.env.AuthzTrans)\r
397          */\r
398         @Override\r
399         public Result<Void> check(AuthzTrans trans) {\r
400                 ResultSet rs;\r
401                 TimeTaken tt = trans.start("Check Table Timestamps",Env.REMOTE);\r
402                 try {\r
403                         rs = getSession(trans).execute(check);\r
404                 } catch (DriverException | APIException | IOException e) {\r
405                         reportPerhapsReset(trans,e);\r
406                         return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG);\r
407                 } finally {\r
408                         tt.done();\r
409                 }\r
410                 \r
411                 String lastName = null;\r
412                 Date[] dates = null;\r
413                 for(Row row : rs.all()) {\r
414                         String name = row.getString(0);\r
415                         int seg = row.getInt(1);\r
416                         if(!name.equals(lastName)) {\r
417                                 dates = info.get(name);\r
418                                 lastName=name;\r
419                         }\r
420                         if(dates==null) {\r
421                                 dates=new Date[seg+1];\r
422                                 info.put(name,dates);\r
423                         } else if(dates.length<=seg) {\r
424                                 Date[] temp = new Date[seg+1];\r
425                                 System.arraycopy(dates, 0, temp, 0, dates.length);\r
426                                 dates = temp;\r
427                                 info.put(name, dates);\r
428                         }\r
429                         Date temp = row.getDate(2);\r
430                         if(dates[seg]==null || dates[seg].before(temp)) {\r
431                                 dates[seg]=temp;\r
432                         }\r
433                 }\r
434                 return Result.ok();\r
435         }\r
436         \r
437     /* (non-Javadoc)\r
438          * @see com.att.dao.aaf.cass.CIDAO#get(java.lang.String, int)\r
439          */\r
440     @Override\r
441         public Date get(AuthzTrans trans, String table, int seg) {\r
442                 Date[] dates = info.get(table);\r
443                 if(dates==null) {\r
444                         dates = new Date[seg+1];\r
445                         touch(trans,table, seg);\r
446                 } else if(dates.length<=seg) {\r
447                         Date[] temp = new Date[seg+1];\r
448                         System.arraycopy(dates, 0, temp, 0, dates.length);\r
449                         dates = temp;\r
450                 }\r
451                 Date rv = dates[seg];\r
452                 if(rv==null) {\r
453                         rv=dates[seg]=startTime;\r
454                 }\r
455                 return rv;\r
456         }\r
457 \r
458         @Override\r
459         protected void wasModified(AuthzTrans trans, CRUD modified, Data data, String ... override) {\r
460                 // Do nothing\r
461         }\r
462 \r
463 }\r