Collection syntax change because of Sonar
[aaf/authz.git] / auth / auth-cass / src / main / java / org / onap / aaf / auth / dao / cass / CacheInfoDAO.java
1 /**
2  * ============LICENSE_START====================================================
3  * org.onap.aaf
4  * ===========================================================================
5  * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
6  * ===========================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END====================================================
19  *
20  */
21
22 package org.onap.aaf.auth.dao.cass;
23
24 import java.io.IOException;
25 import java.net.HttpURLConnection;
26 import java.net.URI;
27 import java.util.Date;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Map;
31 import java.util.Map.Entry;
32 import java.util.concurrent.BlockingQueue;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import java.util.concurrent.TimeUnit;
36
37 import org.onap.aaf.auth.dao.AbsCassDAO;
38 import org.onap.aaf.auth.dao.CIDAO;
39 import org.onap.aaf.auth.dao.CassAccess;
40 import org.onap.aaf.auth.dao.CassDAOImpl;
41 import org.onap.aaf.auth.dao.Loader;
42 import org.onap.aaf.auth.env.AuthzEnv;
43 import org.onap.aaf.auth.env.AuthzTrans;
44 import org.onap.aaf.auth.layer.Result;
45 import org.onap.aaf.cadi.CadiException;
46 import org.onap.aaf.cadi.SecuritySetter;
47 import org.onap.aaf.cadi.client.Future;
48 import org.onap.aaf.cadi.client.Rcli;
49 import org.onap.aaf.cadi.client.Retryable;
50 import org.onap.aaf.cadi.http.HMangr;
51 import org.onap.aaf.misc.env.APIException;
52 import org.onap.aaf.misc.env.Env;
53 import org.onap.aaf.misc.env.TimeTaken;
54 import org.onap.aaf.misc.env.Trans;
55
56 import com.datastax.driver.core.BoundStatement;
57 import com.datastax.driver.core.Cluster;
58 import com.datastax.driver.core.PreparedStatement;
59 import com.datastax.driver.core.ResultSet;
60 import com.datastax.driver.core.Row;
61 import com.datastax.driver.core.exceptions.DriverException;
62
63 public class CacheInfoDAO extends CassDAOImpl<AuthzTrans,CacheInfoDAO.Data> implements CIDAO<AuthzTrans> {
64
65         private static final String TABLE = "cache";
66         public static final Map<String,Date[]> info = new ConcurrentHashMap<>();
67
68         private static CacheUpdate cacheUpdate;
69         
70         // Hold current time stamps from Tables
71         private final Date startTime;
72         private PreparedStatement psCheck;
73         
74         public CacheInfoDAO(AuthzTrans trans, Cluster cluster, String keyspace) throws APIException, IOException {
75                 super(trans, CacheInfoDAO.class.getSimpleName(),cluster,keyspace,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE));
76                 startTime = new Date();
77                 init(trans);
78         }
79
80         public CacheInfoDAO(AuthzTrans trans, AbsCassDAO<AuthzTrans,?> aDao) throws APIException, IOException {
81                 super(trans, CacheInfoDAO.class.getSimpleName(),aDao,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE));
82                 startTime = new Date();
83                 init(trans);
84         }
85
86
87     //////////////////////////////////////////
88     // Data Definition, matches Cassandra DM
89     //////////////////////////////////////////
90     private static final int KEYLIMIT = 2;
91         /**
92      * @author Jonathan
93      */
94         public static class Data {
95                 public Data() {
96                         name = null;
97                         touched = null;
98                 }
99                 public Data(String name, int seg) {
100                         this.name = name;
101                         this.seg = seg;
102                         touched = null;
103                 }
104                 
105                 public String           name;
106                 public int                      seg;
107                 public Date                     touched;
108     }
109
110     private static class InfoLoader extends Loader<Data> {
111         public static final InfoLoader dflt = new InfoLoader(KEYLIMIT);
112         
113                 public InfoLoader(int keylimit) {
114                         super(keylimit);
115                 }
116                 
117                 @Override
118                 public Data load(Data data, Row row) {
119                         // Int more efficient
120                         data.name = row.getString(0);
121                         data.seg = row.getInt(1);
122                         data.touched = row.getTimestamp(2);
123                         return data;
124                 }
125
126                 @Override
127                 protected void key(Data data, int _idx, Object[] obj) {
128                         int idx = _idx;
129
130                         obj[idx]=data.name;
131                         obj[++idx]=data.seg;
132                 }
133
134                 @Override
135                 protected void body(Data data, int idx, Object[] obj) {
136                         obj[idx]=data.touched;
137                 }
138     }
139     
140         public static<T extends Trans> void startUpdate(AuthzEnv env, HMangr hman, SecuritySetter<HttpURLConnection> ss, String ip, int port) {
141                 if(cacheUpdate==null) {
142                         Thread t= new Thread(cacheUpdate = new CacheUpdate(env,hman,ss, ip,port),"CacheInfo Update Thread");
143                         t.setDaemon(true);
144                         t.start();
145                 }
146         }
147
148         public static<T extends Trans> void stopUpdate() {
149                 if(cacheUpdate!=null) {
150                         cacheUpdate.go=false;
151                 }
152         }
153
154         private final static class CacheUpdate extends Thread {
155                 public static BlockingQueue<Transfer> notifyDQ = new LinkedBlockingQueue<Transfer>(2000);
156
157                 private static final String VOID_CT="application/Void+json;q=1.0;charset=utf-8;version=2.0,application/json;q=1.0;version=2.0,*/*;q=1.0";
158                 private AuthzEnv env;
159                 private HMangr hman;
160                 private SecuritySetter<HttpURLConnection> ss;
161                 private final String authority;
162                 public boolean go = true;
163                 
164                 public CacheUpdate(AuthzEnv env, HMangr hman, SecuritySetter<HttpURLConnection> ss, String ip, int port) {
165                         this.env = env;
166                         this.hman = hman;
167                         this.ss = ss;
168                         
169                         this.authority = ip+':'+port;
170                 }
171                 
172                 private static class Transfer {
173                         public String table;
174                         public int segs[];
175                         public Transfer(String table, int[] segs)  {
176                                 this.table = table;
177                                 this.segs = segs;
178                         }
179                 }
180                 private class CacheClear extends Retryable<Integer> {
181                         public int total=0;
182                         private AuthzTrans trans;
183                         private String type;
184                         private String segs;
185                         
186                         public CacheClear(AuthzTrans trans) {
187                                 this.trans = trans;
188                         }
189
190                         public void set(Entry<String, IntHolder> es) {
191                                 type = es.getKey();
192                                 segs = es.getValue().toString();
193                         }
194                         
195                 @Override
196                         public Integer code(Rcli<?> client) throws APIException, CadiException {
197                                 URI to = client.getURI();
198                                 if(!to.getAuthority().equals(authority)) {
199                                         Future<Void> f = client.delete("/mgmt/cache/"+type+'/'+segs,VOID_CT);
200                                         if(f.get(hman.readTimeout())) {
201                                             ++total;
202                                         } else {
203                                             trans.error().log("Error During AAF Peer Notify",f.code(),f.body());
204                                         }
205                                 }
206                                 return total;
207                         }
208                 }
209                 
210                 private class IntHolder {
211                         private int[] raw;
212                         HashSet<Integer> set;
213                         
214                         public IntHolder(int ints[]) {
215                                 raw = ints;
216                                 set = null;
217                         }
218                         public void add(int[] ints) {
219                                 if(set==null) {
220                                         set = new HashSet<>();
221                                         
222                                         for(int i=0;i<raw.length;++i) {
223                                                 set.add(raw[i]);
224                                         }
225                                 }
226                                 for(int i=0;i<ints.length;++i) {
227                                         set.add(ints[i]);
228                                 }
229                         }
230
231                         @Override
232                         public String toString() {
233                                 StringBuilder sb = new StringBuilder();
234                                 boolean first = true;
235                                 if(set==null) {
236                                         for(int i : raw) {
237                                                 if(first) {
238                                                         first=false;
239                                                 } else {
240                                                         sb.append(',');
241                                                 }
242                                                 sb.append(i);
243                                         }
244                                 } else {
245                                         for(Integer i : set) {
246                                                 if(first) {
247                                                         first=false;
248                                                 } else {
249                                                         sb.append(',');
250                                                 }
251                                                 sb.append(i);
252                                         }
253                                 }
254                                 return sb.toString();
255                         }
256                 }
257                 
258                 @Override
259                 public void run() {
260                         do {
261                                 try {
262                                         Transfer data = notifyDQ.poll(4,TimeUnit.SECONDS);
263                                         if(data==null) {
264                                                 continue;
265                                         }
266                                         
267                                         int count = 0;
268                                         CacheClear cc = null;
269                                         Map<String,IntHolder> gather = null;
270                                         AuthzTrans trans = null;
271                                         long start=0;
272                                         // Do a block poll first
273                                         do {
274                                                 if(gather==null) {
275                                                         start = System.nanoTime();
276                                                         trans = env.newTransNoAvg();
277                                                         cc = new CacheClear(trans);
278                                                         gather = new HashMap<>();
279                                                 }
280                                                 IntHolder prev = gather.get(data.table);
281                                                 if(prev==null) {
282                                                         gather.put(data.table,new IntHolder(data.segs));
283                                                 } else {
284                                                         prev.add(data.segs);
285                                                 }
286                                                 // continue while there is data
287                                         } while((data = notifyDQ.poll())!=null);
288                                         if(gather!=null) {
289                                                 for(Entry<String, IntHolder> es : gather.entrySet()) {
290                                                         cc.set(es);
291                                                         try {
292                                                                 if(hman.all(ss, cc, false)!=null) {
293                                                                         ++count;
294                                                                 }
295                                                         } catch (Exception e) {
296                                                                 trans.error().log(e, "Error on Cache Update");
297                                                         }
298                                                 }
299                                                 if(env.debug().isLoggable()) {
300                                                         float millis = (System.nanoTime()-start)/1000000f;
301                                                         StringBuilder sb = new StringBuilder("Direct Cache Refresh: ");
302                                                         sb.append("Updated ");
303                                                         sb.append(count);
304                                                         if(count==1) {
305                                                                 sb.append(" entry for ");
306                                                         } else { 
307                                                                 sb.append(" entries for ");
308                                                         }
309                                                         int peers = count<=0?0:cc.total/count;
310                                                         sb.append(peers);
311                                                         sb.append(" client");
312                                                         if(peers!=1) {
313                                                                 sb.append('s');
314                                                         }
315                                                         sb.append(" in ");
316                                                         sb.append(millis);
317                                                         sb.append("ms");
318                                                         trans.auditTrail(0, sb, Env.REMOTE);
319                                                         env.debug().log(sb);
320                                                 }
321                                         }
322                                 } catch (InterruptedException e1) {
323                                         go = false;
324                                         Thread.currentThread().interrupt();
325                                 }
326                         } while(go);
327                 }
328         }
329
330         private void init(AuthzTrans trans) throws APIException, IOException {
331                 
332                 String[] helpers = setCRUD(trans, TABLE, Data.class, InfoLoader.dflt);
333                 psCheck = getSession(trans).prepare(SELECT_SP +  helpers[FIELD_COMMAS] + " FROM " + TABLE);
334
335                 disable(CRUD.create);
336                 disable(CRUD.delete);
337         }
338
339         /* (non-Javadoc)
340          * @see org.onap.aaf.auth.dao.cass.CIDAO#touch(org.onap.aaf.auth.env.test.AuthzTrans, java.lang.String, int)
341          */
342         
343         @Override
344         public Result<Void> touch(AuthzTrans trans, String name, int ... seg) {
345                 /////////////
346                 // Direct Service Cache Invalidation
347                 /////////////
348                 // ConcurrentQueues are open-ended.  We don't want any Memory leaks 
349                 // Note: we keep a separate counter, because "size()" on a Linked Queue is expensive
350                 if(cacheUpdate!=null) {
351                         try {
352                                 if(!CacheUpdate.notifyDQ.offer(new CacheUpdate.Transfer(name, seg),2,TimeUnit.SECONDS)) {
353                                         trans.error().log("Cache Notify Queue is not accepting messages, bouncing may be appropriate" );
354                                 }
355                         } catch (InterruptedException e) {
356                                 trans.error().log("Cache Notify Queue posting was interrupted" );
357                                 Thread.currentThread().interrupt();
358                         }
359                 }
360
361                 /////////////
362                 // Table Based Cache Invalidation (original)
363                 /////////////
364                 // Note: Save time with multiple Sequence Touches, but PreparedStmt doesn't support IN
365                 StringBuilder start = new StringBuilder("CacheInfoDAO Touch segments ");
366                 start.append(name);
367                 start.append(": ");
368                 StringBuilder sb = new StringBuilder("BEGIN BATCH\n");
369                 boolean first = true;
370                 for(int s : seg) {
371                         sb.append(UPDATE_SP);
372                         sb.append(TABLE);
373                         sb.append(" SET touched=dateof(now()) WHERE name = '");
374                         sb.append(name);
375                         sb.append("' AND seg = ");
376                         sb.append(s);
377                         sb.append(";\n");       
378                         if(first) {
379                                 first =false;
380                         } else {
381                                 start.append(',');
382                         }
383                         start.append(s);
384                 }
385                 sb.append("APPLY BATCH;");
386                 TimeTaken tt = trans.start(start.toString(),Env.REMOTE);
387                 try {
388                         getSession(trans).executeAsync(sb.toString());
389                 } catch (DriverException | APIException | IOException e) {
390                         reportPerhapsReset(trans,e);
391                         return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG);
392                 } finally {
393                         tt.done();
394                 }
395                 return Result.ok();
396         }
397
398         /* (non-Javadoc)
399          * @see org.onap.aaf.auth.dao.cass.CIDAO#check(org.onap.aaf.auth.env.test.AuthzTrans)
400          */
401         @Override
402         public Result<Void> check(AuthzTrans trans) {
403                 ResultSet rs;
404                 TimeTaken tt = trans.start("Check Table Timestamps",Env.REMOTE);
405                 try {
406                         rs = getSession(trans).execute(new BoundStatement(psCheck));
407                 } catch (DriverException | APIException | IOException e) {
408                         reportPerhapsReset(trans,e);
409                         return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG);
410                 } finally {
411                         tt.done();
412                 }
413                 
414                 String lastName = null;
415                 Date[] dates = null;
416                 for(Row row : rs.all()) {
417                         String name = row.getString(0);
418                         int seg = row.getInt(1);
419                         if(!name.equals(lastName)) {
420                                 dates = info.get(name);
421                                 lastName=name;
422                         }
423                         if(dates==null) {
424                                 dates=new Date[seg+1];
425                                 info.put(name,dates);
426                         } else if(dates.length<=seg) {
427                                 Date[] temp = new Date[seg+1];
428                                 System.arraycopy(dates, 0, temp, 0, dates.length);
429                                 dates = temp;
430                                 info.put(name, dates);
431                         }
432                         Date temp = row.getTimestamp(2);
433                         if(dates[seg]==null || dates[seg].before(temp)) {
434                                 dates[seg]=temp;
435                         }
436                 }
437                 return Result.ok();
438         }
439         
440     /* (non-Javadoc)
441          * @see org.onap.aaf.auth.dao.cass.CIDAO#get(java.lang.String, int)
442          */
443     @Override
444         public Date get(AuthzTrans trans, String table, int seg) {
445                 Date[] dates = info.get(table);
446                 if(dates==null) {
447                         dates = new Date[seg+1];
448                         touch(trans,table, seg);
449                 } else if(dates.length<=seg) {
450                         Date[] temp = new Date[seg+1];
451                         System.arraycopy(dates, 0, temp, 0, dates.length);
452                         dates = temp;
453                 }
454                 Date rv = dates[seg];
455                 if(rv==null) {
456                         rv=dates[seg]=startTime;
457                 }
458                 return rv;
459         }
460
461         @Override
462         protected void wasModified(AuthzTrans trans, CRUD modified, Data data, String ... override) {
463                 // Do nothing
464         }
465
466 }