Remove Tabs, per Jococo
[aaf/authz.git] / auth / auth-cass / src / main / java / org / onap / aaf / auth / dao / cass / CacheInfoDAO.java
1 /**
2  * ============LICENSE_START====================================================
3  * org.onap.aaf
4  * ===========================================================================
5  * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
6  *
7  * Modification Copyright (c) 2019 IBM
8  * ===========================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END====================================================
21  *
22  */
23
24 package org.onap.aaf.auth.dao.cass;
25
26 import java.io.IOException;
27 import java.net.HttpURLConnection;
28 import java.net.URI;
29 import java.util.Date;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.concurrent.BlockingQueue;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.TimeUnit;
38
39 import org.onap.aaf.auth.dao.AbsCassDAO;
40 import org.onap.aaf.auth.dao.CIDAO;
41 import org.onap.aaf.auth.dao.CassAccess;
42 import org.onap.aaf.auth.dao.CassDAOImpl;
43 import org.onap.aaf.auth.dao.Loader;
44 import org.onap.aaf.auth.env.AuthzEnv;
45 import org.onap.aaf.auth.env.AuthzTrans;
46 import org.onap.aaf.auth.layer.Result;
47 import org.onap.aaf.cadi.CadiException;
48 import org.onap.aaf.cadi.SecuritySetter;
49 import org.onap.aaf.cadi.client.Future;
50 import org.onap.aaf.cadi.client.Rcli;
51 import org.onap.aaf.cadi.client.Retryable;
52 import org.onap.aaf.cadi.config.Config;
53 import org.onap.aaf.cadi.http.HMangr;
54 import org.onap.aaf.misc.env.APIException;
55 import org.onap.aaf.misc.env.Env;
56 import org.onap.aaf.misc.env.TimeTaken;
57 import org.onap.aaf.misc.env.Trans;
58
59 import com.datastax.driver.core.BoundStatement;
60 import com.datastax.driver.core.Cluster;
61 import com.datastax.driver.core.PreparedStatement;
62 import com.datastax.driver.core.ResultSet;
63 import com.datastax.driver.core.Row;
64 import com.datastax.driver.core.exceptions.DriverException;
65
66 public class CacheInfoDAO extends CassDAOImpl<AuthzTrans,CacheInfoDAO.Data> implements CIDAO<AuthzTrans> {
67
68     private static final String TABLE = "cache";
69     public static final Map<String,Date[]> info = new ConcurrentHashMap<>();
70
71     private static CacheUpdate cacheUpdate;
72     
73     // Hold current time stamps from Tables
74     private final Date startTime;
75     private final boolean cacheNotify;
76     private PreparedStatement psCheck;
77
78     //////////////////////////////////////////
79     // Data Definition, matches Cassandra DM
80     //////////////////////////////////////////
81     private static final int KEYLIMIT = 2;
82     
83     public CacheInfoDAO(AuthzTrans trans, Cluster cluster, String keyspace) throws APIException, IOException {
84         super(trans, CacheInfoDAO.class.getSimpleName(),cluster,keyspace,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE));
85         startTime = new Date();
86         cacheNotify = noK8s(trans);
87         init(trans);
88     }
89
90     public CacheInfoDAO(AuthzTrans trans, AbsCassDAO<AuthzTrans,?> aDao) throws APIException, IOException {
91         super(trans, CacheInfoDAO.class.getSimpleName(),aDao,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE));
92         startTime = new Date();
93         init(trans);
94         cacheNotify = noK8s(trans);
95     }
96
97     /** 
98      * Need a different point to point cache clear strategy for K8s... 
99      * @param trans
100      * @return
101      */
102     private boolean noK8s(AuthzTrans trans) {
103         String container = trans.getProperty(Config.AAF_LOCATOR_CONTAINER);
104         return ! ("helm".equals(container) || "oom".equals(container));
105     }
106     /**
107      * @author Jonathan
108      */
109     public static class Data {
110         public String        name;
111         public int            seg;
112         public Date            touched;
113
114         public Data() {
115             name = null;
116             touched = null;
117         }
118         public Data(String name, int seg) {
119             this.name = name;
120             this.seg = seg;
121             touched = null;
122         }
123     }
124
125     private static class InfoLoader extends Loader<Data> {
126         public static final InfoLoader dflt = new InfoLoader(KEYLIMIT);
127         
128         public InfoLoader(int keylimit) {
129             super(keylimit);
130         }
131         
132         @Override
133         public Data load(Data data, Row row) {
134             // Int more efficient
135             data.name = row.getString(0);
136             data.seg = row.getInt(1);
137             data.touched = row.getTimestamp(2);
138             return data;
139         }
140
141         @Override
142         protected void key(Data data, int idxParam, Object[] obj) {
143                 int idx = idxParam;
144
145             obj[idx]=data.name;
146             obj[++idx]=data.seg;
147         }
148
149         @Override
150         protected void body(Data data, int idx, Object[] obj) {
151             obj[idx]=data.touched;
152         }
153     }
154     
155     public static synchronized <T extends Trans> void startUpdate(AuthzEnv env, HMangr hman, SecuritySetter<HttpURLConnection> ss, String ip, int port) {
156         if (cacheUpdate==null) {
157             cacheUpdate = new CacheUpdate(env,hman,ss, ip,port);
158             Thread t= new Thread(cacheUpdate,"CacheInfo Update Thread");
159             t.setDaemon(true);
160             t.start();
161         }
162     }
163
164     public static<T extends Trans> void stopUpdate() {
165         if (cacheUpdate!=null) {
166             cacheUpdate.go=false;
167         }
168     }
169
170     private static final class CacheUpdate extends Thread {
171         public static BlockingQueue<Transfer> notifyDQ = new LinkedBlockingQueue<Transfer>(2000);
172
173         private static final String VOID_CT="application/Void+json;q=1.0;charset=utf-8;version=2.0,application/json;q=1.0;version=2.0,*/*;q=1.0";
174         private AuthzEnv env;
175         private HMangr hman;
176         private SecuritySetter<HttpURLConnection> ss;
177         private final String authority;
178         public boolean go = true;
179         
180         public CacheUpdate(AuthzEnv env, HMangr hman, SecuritySetter<HttpURLConnection> ss, String ip, int port) {
181             this.env = env;
182             this.hman = hman;
183             this.ss = ss;
184             
185             this.authority = ip+':'+port;
186         }
187         
188         private static class Transfer {
189             public String table;
190             public int segs[];
191             public Transfer(String table, int[] segs)  {
192                 this.table = table;
193                 this.segs = segs;
194             }
195         }
196         private class CacheClear extends Retryable<Integer> {
197             public int total=0;
198             private AuthzTrans trans;
199             private String type;
200             private String segs;
201             
202             public CacheClear(AuthzTrans trans) {
203                 this.trans = trans;
204             }
205
206             public void set(Entry<String, IntHolder> es) {
207                 type = es.getKey();
208                 segs = es.getValue().toString();
209             }
210             
211         @Override
212             public Integer code(Rcli<?> client) throws APIException, CadiException {
213                 URI to = client.getURI();
214                 if (!to.getAuthority().equals(authority)) {
215                     Future<Void> f = client.delete("/mgmt/cache/"+type+'/'+segs,VOID_CT);
216                     if (f.get(hman.readTimeout())) {
217                         ++total;
218                     } else {
219                         trans.error().log("Error During AAF Peer Notify",f.code(),f.body());
220                     }
221                 }
222                 return total;
223             }
224         }
225         
226         private class IntHolder {
227             private int[] raw;
228             HashSet<Integer> set;
229             
230             public IntHolder(int ints[]) {
231                 raw = ints;
232                 set = null;
233             }
234             public void add(int[] ints) {
235                 if (set==null) {
236                     set = new HashSet<>();
237                     
238                     for (int i=0;i<raw.length;++i) {
239                         set.add(raw[i]);
240                     }
241                 }
242                 for (int i=0;i<ints.length;++i) {
243                     set.add(ints[i]);
244                 }
245             }
246
247             @Override
248             public String toString() {
249                 StringBuilder sb = new StringBuilder();
250                 boolean first = true;
251                 if (set==null) {
252                     for (int i : raw) {
253                         if (first) {
254                             first=false;
255                         } else {
256                             sb.append(',');
257                         }
258                         sb.append(i);
259                     }
260                 } else {
261                     for (Integer i : set) {
262                         if (first) {
263                             first=false;
264                         } else {
265                             sb.append(',');
266                         }
267                         sb.append(i);
268                     }
269                 }
270                 return sb.toString();
271             }
272         }
273         
274         @Override
275         public void run() {
276             do {
277                 try {
278                     Transfer data = notifyDQ.poll(4,TimeUnit.SECONDS);
279                     if (data==null) {
280                         continue;
281                     }
282                     
283                     int count = 0;
284                     CacheClear cc = null;
285                     Map<String,IntHolder> gather = null;
286                     AuthzTrans trans = null;
287                     long start=0;
288                     // Do a block poll first
289                     do {
290                         if (gather==null) {
291                             start = System.nanoTime();
292                             trans = env.newTransNoAvg();
293                             cc = new CacheClear(trans);
294                             gather = new HashMap<>();
295                         }
296                         IntHolder prev = gather.get(data.table);
297                         if (prev==null) {
298                             gather.put(data.table,new IntHolder(data.segs));
299                         } else {
300                             prev.add(data.segs);
301                         }
302                         // continue while there is data
303                     } while ((data = notifyDQ.poll())!=null);
304                     if (gather!=null) {
305                         for (Entry<String, IntHolder> es : gather.entrySet()) {
306                             cc.set(es);
307                             try {
308                                 if (hman.all(ss, cc, false)!=null) {
309                                     ++count;
310                                 }
311                             } catch (Exception e) {
312                                 trans.error().log(e, "Error on Cache Update");
313                             }
314                         }
315                         if (env.debug().isLoggable()) {
316                             float millis = (System.nanoTime()-start)/1000000f;
317                             StringBuilder sb = new StringBuilder("Direct Cache Refresh: ");
318                             sb.append("Updated ");
319                             sb.append(count);
320                             if (count==1) {
321                                 sb.append(" entry for ");
322                             } else { 
323                                 sb.append(" entries for ");
324                             }
325                             int peers = count<=0?0:cc.total/count;
326                             sb.append(peers);
327                             sb.append(" client");
328                             if (peers!=1) {
329                                 sb.append('s');
330                             }
331                             sb.append(" in ");
332                             sb.append(millis);
333                             sb.append("ms");
334                             trans.auditTrail(0, sb, Env.REMOTE);
335                             env.debug().log(sb);
336                         }
337                     }
338                 } catch (InterruptedException e1) {
339                     go = false;
340                     Thread.currentThread().interrupt();
341                 }
342             } while (go);
343         }
344     }
345
346     private void init(AuthzTrans trans) throws APIException, IOException {
347         String[] helpers = setCRUD(trans, TABLE, Data.class, InfoLoader.dflt);
348         psCheck = getSession(trans).prepare(SELECT_SP +  helpers[FIELD_COMMAS] + " FROM " + TABLE);
349
350         disable(CRUD.create);
351         disable(CRUD.delete);
352     }
353
354     /* (non-Javadoc)
355      * @see org.onap.aaf.auth.dao.cass.CIDAO#touch(org.onap.aaf.auth.env.test.AuthzTrans, java.lang.String, int)
356      */
357     
358     @Override
359     public Result<Void> touch(AuthzTrans trans, String name, int ... seg) {
360         /////////////
361         // Direct Service Cache Invalidation
362         /////////////
363         // ConcurrentQueues are open-ended.  We don't want any Memory leaks 
364         // Note: we keep a separate counter, because "size()" on a Linked Queue is expensive
365         if (cacheNotify && cacheUpdate!=null) {
366             try {
367                 if (!CacheUpdate.notifyDQ.offer(new CacheUpdate.Transfer(name, seg),2,TimeUnit.SECONDS)) {
368                     trans.error().log("Cache Notify Queue is not accepting messages, bouncing may be appropriate" );
369                 }
370             } catch (InterruptedException e) {
371                 trans.error().log("Cache Notify Queue posting was interrupted" );
372                 Thread.currentThread().interrupt();
373             }
374         }
375
376         /////////////
377         // Table Based Cache Invalidation (original)
378         /////////////
379         // Note: Save time with multiple Sequence Touches, but PreparedStmt doesn't support IN
380         StringBuilder start = new StringBuilder("CacheInfoDAO Touch segments ");
381         start.append(name);
382         start.append(": ");
383         StringBuilder sb = new StringBuilder("BEGIN BATCH\n");
384         boolean first = true;
385         for (int s : seg) {
386             sb.append(UPDATE_SP);
387             sb.append(TABLE);
388             sb.append(" SET touched=dateof(now()) WHERE name = '");
389             sb.append(name);
390             sb.append("' AND seg = ");
391             sb.append(s);
392             sb.append(";\n");    
393             if (first) {
394                 first =false;
395             } else {
396                 start.append(',');
397             }
398             start.append(s);
399         }
400         sb.append("APPLY BATCH;");
401         TimeTaken tt = trans.start(start.toString(),Env.REMOTE);
402         try {
403             getSession(trans).executeAsync(sb.toString());
404         } catch (DriverException | APIException | IOException e) {
405             reportPerhapsReset(trans,e);
406             return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG);
407         } finally {
408             tt.done();
409         }
410         return Result.ok();
411     }
412
413     /* (non-Javadoc)
414      * @see org.onap.aaf.auth.dao.cass.CIDAO#check(org.onap.aaf.auth.env.test.AuthzTrans)
415      */
416     @Override
417     public Result<Void> check(AuthzTrans trans) {
418         ResultSet rs;
419         TimeTaken tt = trans.start("Check Table Timestamps",Env.REMOTE);
420         try {
421             rs = getSession(trans).execute(new BoundStatement(psCheck));
422         } catch (DriverException | APIException | IOException e) {
423             reportPerhapsReset(trans,e);
424             return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG);
425         } finally {
426             tt.done();
427         }
428         
429         String lastName = null;
430         Date[] dates = null;
431         for (Row row : rs.all()) {
432             String name = row.getString(0);
433             int seg = row.getInt(1);
434             if (!name.equals(lastName)) {
435                 dates = info.get(name);
436                 lastName=name;
437             }
438             if (dates==null) {
439                 dates=new Date[seg+1];
440                 info.put(name,dates);
441             } else if (dates.length<=seg) {
442                 Date[] temp = new Date[seg+1];
443                 System.arraycopy(dates, 0, temp, 0, dates.length);
444                 dates = temp;
445                 info.put(name, dates);
446             }
447             Date temp = row.getTimestamp(2);
448             if (dates[seg]==null || dates[seg].before(temp)) {
449                 dates[seg]=temp;
450             }
451         }
452         return Result.ok();
453     }
454     
455     /* (non-Javadoc)
456      * @see org.onap.aaf.auth.dao.cass.CIDAO#get(java.lang.String, int)
457      */
458     @Override
459     public Date get(AuthzTrans trans, String table, int seg) {
460         Date[] dates = info.get(table);
461         if (dates==null) {
462             dates = new Date[seg+1];
463             touch(trans,table, seg);
464         } else if (dates.length<=seg) {
465             Date[] temp = new Date[seg+1];
466             System.arraycopy(dates, 0, temp, 0, dates.length);
467             dates = temp;
468         }
469         Date rv = dates[seg];
470         if (rv==null) {
471             rv=dates[seg]=startTime;
472         }
473         return rv;
474     }
475
476     @Override
477     protected void wasModified(AuthzTrans trans, CRUD modified, Data data, String ... override) {
478         // Do nothing
479     }
480
481 }