Fix GUI image locations
[aaf/authz.git] / auth / auth-cass / src / main / java / org / onap / aaf / auth / dao / cass / CacheInfoDAO.java
1 /**
2  * ============LICENSE_START====================================================
3  * org.onap.aaf
4  * ===========================================================================
5  * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
6  * ===========================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END====================================================
19  *
20  */
21
22 package org.onap.aaf.auth.dao.cass;
23
24 import java.io.IOException;
25 import java.net.HttpURLConnection;
26 import java.net.URI;
27 import java.util.Date;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Map;
31 import java.util.Map.Entry;
32 import java.util.concurrent.BlockingQueue;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import java.util.concurrent.TimeUnit;
36
37 import org.onap.aaf.auth.dao.AbsCassDAO;
38 import org.onap.aaf.auth.dao.CIDAO;
39 import org.onap.aaf.auth.dao.CassAccess;
40 import org.onap.aaf.auth.dao.CassDAOImpl;
41 import org.onap.aaf.auth.dao.Loader;
42 import org.onap.aaf.auth.env.AuthzEnv;
43 import org.onap.aaf.auth.env.AuthzTrans;
44 import org.onap.aaf.auth.layer.Result;
45 import org.onap.aaf.cadi.CadiException;
46 import org.onap.aaf.cadi.SecuritySetter;
47 import org.onap.aaf.cadi.client.Future;
48 import org.onap.aaf.cadi.client.Rcli;
49 import org.onap.aaf.cadi.client.Retryable;
50 import org.onap.aaf.cadi.config.Config;
51 import org.onap.aaf.cadi.http.HMangr;
52 import org.onap.aaf.misc.env.APIException;
53 import org.onap.aaf.misc.env.Env;
54 import org.onap.aaf.misc.env.TimeTaken;
55 import org.onap.aaf.misc.env.Trans;
56
57 import com.datastax.driver.core.BoundStatement;
58 import com.datastax.driver.core.Cluster;
59 import com.datastax.driver.core.PreparedStatement;
60 import com.datastax.driver.core.ResultSet;
61 import com.datastax.driver.core.Row;
62 import com.datastax.driver.core.exceptions.DriverException;
63
64 public class CacheInfoDAO extends CassDAOImpl<AuthzTrans,CacheInfoDAO.Data> implements CIDAO<AuthzTrans> {
65
66     private static final String TABLE = "cache";
67     public static final Map<String,Date[]> info = new ConcurrentHashMap<>();
68
69     private static CacheUpdate cacheUpdate;
70     
71     // Hold current time stamps from Tables
72     private final Date startTime;
73         private final boolean cacheNotify;
74     private PreparedStatement psCheck;
75     
76     public CacheInfoDAO(AuthzTrans trans, Cluster cluster, String keyspace) throws APIException, IOException {
77         super(trans, CacheInfoDAO.class.getSimpleName(),cluster,keyspace,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE));
78         startTime = new Date();
79         cacheNotify = noK8s(trans);
80         init(trans);
81     }
82
83         public CacheInfoDAO(AuthzTrans trans, AbsCassDAO<AuthzTrans,?> aDao) throws APIException, IOException {
84         super(trans, CacheInfoDAO.class.getSimpleName(),aDao,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE));
85         startTime = new Date();
86         init(trans);
87         cacheNotify = noK8s(trans);
88     }
89
90         /** 
91          * Need a different point to point cache clear strategy for K8s... 
92          * @param trans
93          * @return
94          */
95     private boolean noK8s(AuthzTrans trans) {
96         String container = trans.getProperty(Config.AAF_LOCATOR_CONTAINER);
97         return ! ("helm".equals(container) || "oom".equals(container));
98         }
99
100     //////////////////////////////////////////
101     // Data Definition, matches Cassandra DM
102     //////////////////////////////////////////
103     private static final int KEYLIMIT = 2;
104     /**
105      * @author Jonathan
106      */
107     public static class Data {
108         public Data() {
109             name = null;
110             touched = null;
111         }
112         public Data(String name, int seg) {
113             this.name = name;
114             this.seg = seg;
115             touched = null;
116         }
117         
118         public String        name;
119         public int            seg;
120         public Date            touched;
121     }
122
123     private static class InfoLoader extends Loader<Data> {
124         public static final InfoLoader dflt = new InfoLoader(KEYLIMIT);
125         
126         public InfoLoader(int keylimit) {
127             super(keylimit);
128         }
129         
130         @Override
131         public Data load(Data data, Row row) {
132             // Int more efficient
133             data.name = row.getString(0);
134             data.seg = row.getInt(1);
135             data.touched = row.getTimestamp(2);
136             return data;
137         }
138
139         @Override
140         protected void key(Data data, int _idx, Object[] obj) {
141                 int idx = _idx;
142
143             obj[idx]=data.name;
144             obj[++idx]=data.seg;
145         }
146
147         @Override
148         protected void body(Data data, int idx, Object[] obj) {
149             obj[idx]=data.touched;
150         }
151     }
152     
153     public static synchronized <T extends Trans> void startUpdate(AuthzEnv env, HMangr hman, SecuritySetter<HttpURLConnection> ss, String ip, int port) {
154         if (cacheUpdate==null) {
155             Thread t= new Thread(cacheUpdate = new CacheUpdate(env,hman,ss, ip,port),"CacheInfo Update Thread");
156             t.setDaemon(true);
157             t.start();
158         }
159     }
160
161     public static<T extends Trans> void stopUpdate() {
162         if (cacheUpdate!=null) {
163             cacheUpdate.go=false;
164         }
165     }
166
167     private final static class CacheUpdate extends Thread {
168         public static BlockingQueue<Transfer> notifyDQ = new LinkedBlockingQueue<Transfer>(2000);
169
170         private static final String VOID_CT="application/Void+json;q=1.0;charset=utf-8;version=2.0,application/json;q=1.0;version=2.0,*/*;q=1.0";
171         private AuthzEnv env;
172         private HMangr hman;
173         private SecuritySetter<HttpURLConnection> ss;
174         private final String authority;
175         public boolean go = true;
176         
177         public CacheUpdate(AuthzEnv env, HMangr hman, SecuritySetter<HttpURLConnection> ss, String ip, int port) {
178             this.env = env;
179             this.hman = hman;
180             this.ss = ss;
181             
182             this.authority = ip+':'+port;
183         }
184         
185         private static class Transfer {
186             public String table;
187             public int segs[];
188             public Transfer(String table, int[] segs)  {
189                 this.table = table;
190                 this.segs = segs;
191             }
192         }
193         private class CacheClear extends Retryable<Integer> {
194             public int total=0;
195             private AuthzTrans trans;
196             private String type;
197             private String segs;
198             
199             public CacheClear(AuthzTrans trans) {
200                 this.trans = trans;
201             }
202
203             public void set(Entry<String, IntHolder> es) {
204                 type = es.getKey();
205                 segs = es.getValue().toString();
206             }
207             
208         @Override
209             public Integer code(Rcli<?> client) throws APIException, CadiException {
210                 URI to = client.getURI();
211                 if (!to.getAuthority().equals(authority)) {
212                     Future<Void> f = client.delete("/mgmt/cache/"+type+'/'+segs,VOID_CT);
213                     if (f.get(hman.readTimeout())) {
214                         ++total;
215                     } else {
216                         trans.error().log("Error During AAF Peer Notify",f.code(),f.body());
217                     }
218                 }
219                 return total;
220             }
221         }
222         
223         private class IntHolder {
224             private int[] raw;
225             HashSet<Integer> set;
226             
227             public IntHolder(int ints[]) {
228                 raw = ints;
229                 set = null;
230             }
231             public void add(int[] ints) {
232                 if (set==null) {
233                     set = new HashSet<>();
234                     
235                     for (int i=0;i<raw.length;++i) {
236                         set.add(raw[i]);
237                     }
238                 }
239                 for (int i=0;i<ints.length;++i) {
240                     set.add(ints[i]);
241                 }
242             }
243
244             @Override
245             public String toString() {
246                 StringBuilder sb = new StringBuilder();
247                 boolean first = true;
248                 if (set==null) {
249                     for (int i : raw) {
250                         if (first) {
251                             first=false;
252                         } else {
253                             sb.append(',');
254                         }
255                         sb.append(i);
256                     }
257                 } else {
258                     for (Integer i : set) {
259                         if (first) {
260                             first=false;
261                         } else {
262                             sb.append(',');
263                         }
264                         sb.append(i);
265                     }
266                 }
267                 return sb.toString();
268             }
269         }
270         
271         @Override
272         public void run() {
273             do {
274                 try {
275                     Transfer data = notifyDQ.poll(4,TimeUnit.SECONDS);
276                     if (data==null) {
277                         continue;
278                     }
279                     
280                     int count = 0;
281                     CacheClear cc = null;
282                     Map<String,IntHolder> gather = null;
283                     AuthzTrans trans = null;
284                     long start=0;
285                     // Do a block poll first
286                     do {
287                         if (gather==null) {
288                             start = System.nanoTime();
289                             trans = env.newTransNoAvg();
290                             cc = new CacheClear(trans);
291                             gather = new HashMap<>();
292                         }
293                         IntHolder prev = gather.get(data.table);
294                         if (prev==null) {
295                             gather.put(data.table,new IntHolder(data.segs));
296                         } else {
297                             prev.add(data.segs);
298                         }
299                         // continue while there is data
300                     } while ((data = notifyDQ.poll())!=null);
301                     if (gather!=null) {
302                         for (Entry<String, IntHolder> es : gather.entrySet()) {
303                             cc.set(es);
304                             try {
305                                 if (hman.all(ss, cc, false)!=null) {
306                                     ++count;
307                                 }
308                             } catch (Exception e) {
309                                 trans.error().log(e, "Error on Cache Update");
310                             }
311                         }
312                         if (env.debug().isLoggable()) {
313                             float millis = (System.nanoTime()-start)/1000000f;
314                             StringBuilder sb = new StringBuilder("Direct Cache Refresh: ");
315                             sb.append("Updated ");
316                             sb.append(count);
317                             if (count==1) {
318                                 sb.append(" entry for ");
319                             } else { 
320                                 sb.append(" entries for ");
321                             }
322                             int peers = count<=0?0:cc.total/count;
323                             sb.append(peers);
324                             sb.append(" client");
325                             if (peers!=1) {
326                                 sb.append('s');
327                             }
328                             sb.append(" in ");
329                             sb.append(millis);
330                             sb.append("ms");
331                             trans.auditTrail(0, sb, Env.REMOTE);
332                             env.debug().log(sb);
333                         }
334                     }
335                 } catch (InterruptedException e1) {
336                     go = false;
337                     Thread.currentThread().interrupt();
338                 }
339             } while (go);
340         }
341     }
342
343     private void init(AuthzTrans trans) throws APIException, IOException {
344         String[] helpers = setCRUD(trans, TABLE, Data.class, InfoLoader.dflt);
345         psCheck = getSession(trans).prepare(SELECT_SP +  helpers[FIELD_COMMAS] + " FROM " + TABLE);
346
347         disable(CRUD.create);
348         disable(CRUD.delete);
349     }
350
351     /* (non-Javadoc)
352      * @see org.onap.aaf.auth.dao.cass.CIDAO#touch(org.onap.aaf.auth.env.test.AuthzTrans, java.lang.String, int)
353      */
354     
355     @Override
356     public Result<Void> touch(AuthzTrans trans, String name, int ... seg) {
357         /////////////
358         // Direct Service Cache Invalidation
359         /////////////
360         // ConcurrentQueues are open-ended.  We don't want any Memory leaks 
361         // Note: we keep a separate counter, because "size()" on a Linked Queue is expensive
362         if (cacheNotify && cacheUpdate!=null) {
363             try {
364                 if (!CacheUpdate.notifyDQ.offer(new CacheUpdate.Transfer(name, seg),2,TimeUnit.SECONDS)) {
365                     trans.error().log("Cache Notify Queue is not accepting messages, bouncing may be appropriate" );
366                 }
367             } catch (InterruptedException e) {
368                 trans.error().log("Cache Notify Queue posting was interrupted" );
369                 Thread.currentThread().interrupt();
370             }
371         }
372
373         /////////////
374         // Table Based Cache Invalidation (original)
375         /////////////
376         // Note: Save time with multiple Sequence Touches, but PreparedStmt doesn't support IN
377         StringBuilder start = new StringBuilder("CacheInfoDAO Touch segments ");
378         start.append(name);
379         start.append(": ");
380         StringBuilder sb = new StringBuilder("BEGIN BATCH\n");
381         boolean first = true;
382         for (int s : seg) {
383             sb.append(UPDATE_SP);
384             sb.append(TABLE);
385             sb.append(" SET touched=dateof(now()) WHERE name = '");
386             sb.append(name);
387             sb.append("' AND seg = ");
388             sb.append(s);
389             sb.append(";\n");    
390             if (first) {
391                 first =false;
392             } else {
393                 start.append(',');
394             }
395             start.append(s);
396         }
397         sb.append("APPLY BATCH;");
398         TimeTaken tt = trans.start(start.toString(),Env.REMOTE);
399         try {
400             getSession(trans).executeAsync(sb.toString());
401         } catch (DriverException | APIException | IOException e) {
402             reportPerhapsReset(trans,e);
403             return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG);
404         } finally {
405             tt.done();
406         }
407         return Result.ok();
408     }
409
410     /* (non-Javadoc)
411      * @see org.onap.aaf.auth.dao.cass.CIDAO#check(org.onap.aaf.auth.env.test.AuthzTrans)
412      */
413     @Override
414     public Result<Void> check(AuthzTrans trans) {
415         ResultSet rs;
416         TimeTaken tt = trans.start("Check Table Timestamps",Env.REMOTE);
417         try {
418             rs = getSession(trans).execute(new BoundStatement(psCheck));
419         } catch (DriverException | APIException | IOException e) {
420             reportPerhapsReset(trans,e);
421             return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG);
422         } finally {
423             tt.done();
424         }
425         
426         String lastName = null;
427         Date[] dates = null;
428         for (Row row : rs.all()) {
429             String name = row.getString(0);
430             int seg = row.getInt(1);
431             if (!name.equals(lastName)) {
432                 dates = info.get(name);
433                 lastName=name;
434             }
435             if (dates==null) {
436                 dates=new Date[seg+1];
437                 info.put(name,dates);
438             } else if (dates.length<=seg) {
439                 Date[] temp = new Date[seg+1];
440                 System.arraycopy(dates, 0, temp, 0, dates.length);
441                 dates = temp;
442                 info.put(name, dates);
443             }
444             Date temp = row.getTimestamp(2);
445             if (dates[seg]==null || dates[seg].before(temp)) {
446                 dates[seg]=temp;
447             }
448         }
449         return Result.ok();
450     }
451     
452     /* (non-Javadoc)
453      * @see org.onap.aaf.auth.dao.cass.CIDAO#get(java.lang.String, int)
454      */
455     @Override
456     public Date get(AuthzTrans trans, String table, int seg) {
457         Date[] dates = info.get(table);
458         if (dates==null) {
459             dates = new Date[seg+1];
460             touch(trans,table, seg);
461         } else if (dates.length<=seg) {
462             Date[] temp = new Date[seg+1];
463             System.arraycopy(dates, 0, temp, 0, dates.length);
464             dates = temp;
465         }
466         Date rv = dates[seg];
467         if (rv==null) {
468             rv=dates[seg]=startTime;
469         }
470         return rv;
471     }
472
473     @Override
474     protected void wasModified(AuthzTrans trans, CRUD modified, Data data, String ... override) {
475         // Do nothing
476     }
477
478 }