Post Init Service Starter
[aaf/authz.git] / auth / auth-cass / src / main / java / org / onap / aaf / auth / dao / cass / CacheInfoDAO.java
1 /**
2  * ============LICENSE_START====================================================
3  * org.onap.aaf
4  * ===========================================================================
5  * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
6  * ===========================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END====================================================
19  *
20  */
21
22 package org.onap.aaf.auth.dao.cass;
23
24 import java.io.IOException;
25 import java.net.HttpURLConnection;
26 import java.net.URI;
27 import java.util.Date;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.Map;
31 import java.util.Map.Entry;
32 import java.util.concurrent.BlockingQueue;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.LinkedBlockingQueue;
35 import java.util.concurrent.TimeUnit;
36
37 import org.onap.aaf.auth.dao.AbsCassDAO;
38 import org.onap.aaf.auth.dao.CIDAO;
39 import org.onap.aaf.auth.dao.CassAccess;
40 import org.onap.aaf.auth.dao.CassDAOImpl;
41 import org.onap.aaf.auth.dao.Loader;
42 import org.onap.aaf.auth.env.AuthzEnv;
43 import org.onap.aaf.auth.env.AuthzTrans;
44 import org.onap.aaf.auth.layer.Result;
45 import org.onap.aaf.cadi.CadiException;
46 import org.onap.aaf.cadi.SecuritySetter;
47 import org.onap.aaf.cadi.client.Future;
48 import org.onap.aaf.cadi.client.Rcli;
49 import org.onap.aaf.cadi.client.Retryable;
50 import org.onap.aaf.cadi.http.HMangr;
51 import org.onap.aaf.misc.env.APIException;
52 import org.onap.aaf.misc.env.Env;
53 import org.onap.aaf.misc.env.TimeTaken;
54 import org.onap.aaf.misc.env.Trans;
55
56 import com.datastax.driver.core.BoundStatement;
57 import com.datastax.driver.core.Cluster;
58 import com.datastax.driver.core.PreparedStatement;
59 import com.datastax.driver.core.ResultSet;
60 import com.datastax.driver.core.Row;
61 import com.datastax.driver.core.exceptions.DriverException;
62
63 public class CacheInfoDAO extends CassDAOImpl<AuthzTrans,CacheInfoDAO.Data> implements CIDAO<AuthzTrans> {
64
65     private static final String TABLE = "cache";
66     public static final Map<String,Date[]> info = new ConcurrentHashMap<>();
67
68     private static CacheUpdate cacheUpdate;
69     
70     // Hold current time stamps from Tables
71     private final Date startTime;
72     private PreparedStatement psCheck;
73     
74     public CacheInfoDAO(AuthzTrans trans, Cluster cluster, String keyspace) throws APIException, IOException {
75         super(trans, CacheInfoDAO.class.getSimpleName(),cluster,keyspace,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE));
76         startTime = new Date();
77         init(trans);
78     }
79
80     public CacheInfoDAO(AuthzTrans trans, AbsCassDAO<AuthzTrans,?> aDao) throws APIException, IOException {
81         super(trans, CacheInfoDAO.class.getSimpleName(),aDao,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE));
82         startTime = new Date();
83         init(trans);
84     }
85
86
87     //////////////////////////////////////////
88     // Data Definition, matches Cassandra DM
89     //////////////////////////////////////////
90     private static final int KEYLIMIT = 2;
91     /**
92      * @author Jonathan
93      */
94     public static class Data {
95         public Data() {
96             name = null;
97             touched = null;
98         }
99         public Data(String name, int seg) {
100             this.name = name;
101             this.seg = seg;
102             touched = null;
103         }
104         
105         public String        name;
106         public int            seg;
107         public Date            touched;
108     }
109
110     private static class InfoLoader extends Loader<Data> {
111         public static final InfoLoader dflt = new InfoLoader(KEYLIMIT);
112         
113         public InfoLoader(int keylimit) {
114             super(keylimit);
115         }
116         
117         @Override
118         public Data load(Data data, Row row) {
119             // Int more efficient
120             data.name = row.getString(0);
121             data.seg = row.getInt(1);
122             data.touched = row.getTimestamp(2);
123             return data;
124         }
125
126         @Override
127         protected void key(Data data, int _idx, Object[] obj) {
128                 int idx = _idx;
129
130             obj[idx]=data.name;
131             obj[++idx]=data.seg;
132         }
133
134         @Override
135         protected void body(Data data, int idx, Object[] obj) {
136             obj[idx]=data.touched;
137         }
138     }
139     
140     public static synchronized <T extends Trans> void startUpdate(AuthzEnv env, HMangr hman, SecuritySetter<HttpURLConnection> ss, String ip, int port) {
141         if (cacheUpdate==null) {
142             Thread t= new Thread(cacheUpdate = new CacheUpdate(env,hman,ss, ip,port),"CacheInfo Update Thread");
143             t.setDaemon(true);
144             t.start();
145         }
146     }
147
148     public static<T extends Trans> void stopUpdate() {
149         if (cacheUpdate!=null) {
150             cacheUpdate.go=false;
151         }
152     }
153
154     private final static class CacheUpdate extends Thread {
155         public static BlockingQueue<Transfer> notifyDQ = new LinkedBlockingQueue<Transfer>(2000);
156
157         private static final String VOID_CT="application/Void+json;q=1.0;charset=utf-8;version=2.0,application/json;q=1.0;version=2.0,*/*;q=1.0";
158         private AuthzEnv env;
159         private HMangr hman;
160         private SecuritySetter<HttpURLConnection> ss;
161         private final String authority;
162         public boolean go = true;
163         
164         public CacheUpdate(AuthzEnv env, HMangr hman, SecuritySetter<HttpURLConnection> ss, String ip, int port) {
165             this.env = env;
166             this.hman = hman;
167             this.ss = ss;
168             
169             this.authority = ip+':'+port;
170         }
171         
172         private static class Transfer {
173             public String table;
174             public int segs[];
175             public Transfer(String table, int[] segs)  {
176                 this.table = table;
177                 this.segs = segs;
178             }
179         }
180         private class CacheClear extends Retryable<Integer> {
181             public int total=0;
182             private AuthzTrans trans;
183             private String type;
184             private String segs;
185             
186             public CacheClear(AuthzTrans trans) {
187                 this.trans = trans;
188             }
189
190             public void set(Entry<String, IntHolder> es) {
191                 type = es.getKey();
192                 segs = es.getValue().toString();
193             }
194             
195         @Override
196             public Integer code(Rcli<?> client) throws APIException, CadiException {
197                 URI to = client.getURI();
198                 if (!to.getAuthority().equals(authority)) {
199                     Future<Void> f = client.delete("/mgmt/cache/"+type+'/'+segs,VOID_CT);
200                     if (f.get(hman.readTimeout())) {
201                         ++total;
202                     } else {
203                         trans.error().log("Error During AAF Peer Notify",f.code(),f.body());
204                     }
205                 }
206                 return total;
207             }
208         }
209         
210         private class IntHolder {
211             private int[] raw;
212             HashSet<Integer> set;
213             
214             public IntHolder(int ints[]) {
215                 raw = ints;
216                 set = null;
217             }
218             public void add(int[] ints) {
219                 if (set==null) {
220                     set = new HashSet<>();
221                     
222                     for (int i=0;i<raw.length;++i) {
223                         set.add(raw[i]);
224                     }
225                 }
226                 for (int i=0;i<ints.length;++i) {
227                     set.add(ints[i]);
228                 }
229             }
230
231             @Override
232             public String toString() {
233                 StringBuilder sb = new StringBuilder();
234                 boolean first = true;
235                 if (set==null) {
236                     for (int i : raw) {
237                         if (first) {
238                             first=false;
239                         } else {
240                             sb.append(',');
241                         }
242                         sb.append(i);
243                     }
244                 } else {
245                     for (Integer i : set) {
246                         if (first) {
247                             first=false;
248                         } else {
249                             sb.append(',');
250                         }
251                         sb.append(i);
252                     }
253                 }
254                 return sb.toString();
255             }
256         }
257         
258         @Override
259         public void run() {
260             do {
261                 try {
262                     Transfer data = notifyDQ.poll(4,TimeUnit.SECONDS);
263                     if (data==null) {
264                         continue;
265                     }
266                     
267                     int count = 0;
268                     CacheClear cc = null;
269                     Map<String,IntHolder> gather = null;
270                     AuthzTrans trans = null;
271                     long start=0;
272                     // Do a block poll first
273                     do {
274                         if (gather==null) {
275                             start = System.nanoTime();
276                             trans = env.newTransNoAvg();
277                             cc = new CacheClear(trans);
278                             gather = new HashMap<>();
279                         }
280                         IntHolder prev = gather.get(data.table);
281                         if (prev==null) {
282                             gather.put(data.table,new IntHolder(data.segs));
283                         } else {
284                             prev.add(data.segs);
285                         }
286                         // continue while there is data
287                     } while ((data = notifyDQ.poll())!=null);
288                     if (gather!=null) {
289                         for (Entry<String, IntHolder> es : gather.entrySet()) {
290                             cc.set(es);
291                             try {
292                                 if (hman.all(ss, cc, false)!=null) {
293                                     ++count;
294                                 }
295                             } catch (Exception e) {
296                                 trans.error().log(e, "Error on Cache Update");
297                             }
298                         }
299                         if (env.debug().isLoggable()) {
300                             float millis = (System.nanoTime()-start)/1000000f;
301                             StringBuilder sb = new StringBuilder("Direct Cache Refresh: ");
302                             sb.append("Updated ");
303                             sb.append(count);
304                             if (count==1) {
305                                 sb.append(" entry for ");
306                             } else { 
307                                 sb.append(" entries for ");
308                             }
309                             int peers = count<=0?0:cc.total/count;
310                             sb.append(peers);
311                             sb.append(" client");
312                             if (peers!=1) {
313                                 sb.append('s');
314                             }
315                             sb.append(" in ");
316                             sb.append(millis);
317                             sb.append("ms");
318                             trans.auditTrail(0, sb, Env.REMOTE);
319                             env.debug().log(sb);
320                         }
321                     }
322                 } catch (InterruptedException e1) {
323                     go = false;
324                     Thread.currentThread().interrupt();
325                 }
326             } while (go);
327         }
328     }
329
330     private void init(AuthzTrans trans) throws APIException, IOException {
331         
332         String[] helpers = setCRUD(trans, TABLE, Data.class, InfoLoader.dflt);
333         psCheck = getSession(trans).prepare(SELECT_SP +  helpers[FIELD_COMMAS] + " FROM " + TABLE);
334
335         disable(CRUD.create);
336         disable(CRUD.delete);
337     }
338
339     /* (non-Javadoc)
340      * @see org.onap.aaf.auth.dao.cass.CIDAO#touch(org.onap.aaf.auth.env.test.AuthzTrans, java.lang.String, int)
341      */
342     
343     @Override
344     public Result<Void> touch(AuthzTrans trans, String name, int ... seg) {
345         /////////////
346         // Direct Service Cache Invalidation
347         /////////////
348         // ConcurrentQueues are open-ended.  We don't want any Memory leaks 
349         // Note: we keep a separate counter, because "size()" on a Linked Queue is expensive
350         if (cacheUpdate!=null) {
351             try {
352                 if (!CacheUpdate.notifyDQ.offer(new CacheUpdate.Transfer(name, seg),2,TimeUnit.SECONDS)) {
353                     trans.error().log("Cache Notify Queue is not accepting messages, bouncing may be appropriate" );
354                 }
355             } catch (InterruptedException e) {
356                 trans.error().log("Cache Notify Queue posting was interrupted" );
357                 Thread.currentThread().interrupt();
358             }
359         }
360
361         /////////////
362         // Table Based Cache Invalidation (original)
363         /////////////
364         // Note: Save time with multiple Sequence Touches, but PreparedStmt doesn't support IN
365         StringBuilder start = new StringBuilder("CacheInfoDAO Touch segments ");
366         start.append(name);
367         start.append(": ");
368         StringBuilder sb = new StringBuilder("BEGIN BATCH\n");
369         boolean first = true;
370         for (int s : seg) {
371             sb.append(UPDATE_SP);
372             sb.append(TABLE);
373             sb.append(" SET touched=dateof(now()) WHERE name = '");
374             sb.append(name);
375             sb.append("' AND seg = ");
376             sb.append(s);
377             sb.append(";\n");    
378             if (first) {
379                 first =false;
380             } else {
381                 start.append(',');
382             }
383             start.append(s);
384         }
385         sb.append("APPLY BATCH;");
386         TimeTaken tt = trans.start(start.toString(),Env.REMOTE);
387         try {
388             getSession(trans).executeAsync(sb.toString());
389         } catch (DriverException | APIException | IOException e) {
390             reportPerhapsReset(trans,e);
391             return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG);
392         } finally {
393             tt.done();
394         }
395         return Result.ok();
396     }
397
398     /* (non-Javadoc)
399      * @see org.onap.aaf.auth.dao.cass.CIDAO#check(org.onap.aaf.auth.env.test.AuthzTrans)
400      */
401     @Override
402     public Result<Void> check(AuthzTrans trans) {
403         ResultSet rs;
404         TimeTaken tt = trans.start("Check Table Timestamps",Env.REMOTE);
405         try {
406             rs = getSession(trans).execute(new BoundStatement(psCheck));
407         } catch (DriverException | APIException | IOException e) {
408             reportPerhapsReset(trans,e);
409             return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG);
410         } finally {
411             tt.done();
412         }
413         
414         String lastName = null;
415         Date[] dates = null;
416         for (Row row : rs.all()) {
417             String name = row.getString(0);
418             int seg = row.getInt(1);
419             if (!name.equals(lastName)) {
420                 dates = info.get(name);
421                 lastName=name;
422             }
423             if (dates==null) {
424                 dates=new Date[seg+1];
425                 info.put(name,dates);
426             } else if (dates.length<=seg) {
427                 Date[] temp = new Date[seg+1];
428                 System.arraycopy(dates, 0, temp, 0, dates.length);
429                 dates = temp;
430                 info.put(name, dates);
431             }
432             Date temp = row.getTimestamp(2);
433             if (dates[seg]==null || dates[seg].before(temp)) {
434                 dates[seg]=temp;
435             }
436         }
437         return Result.ok();
438     }
439     
440     /* (non-Javadoc)
441      * @see org.onap.aaf.auth.dao.cass.CIDAO#get(java.lang.String, int)
442      */
443     @Override
444     public Date get(AuthzTrans trans, String table, int seg) {
445         Date[] dates = info.get(table);
446         if (dates==null) {
447             dates = new Date[seg+1];
448             touch(trans,table, seg);
449         } else if (dates.length<=seg) {
450             Date[] temp = new Date[seg+1];
451             System.arraycopy(dates, 0, temp, 0, dates.length);
452             dates = temp;
453         }
454         Date rv = dates[seg];
455         if (rv==null) {
456             rv=dates[seg]=startTime;
457         }
458         return rv;
459     }
460
461     @Override
462     protected void wasModified(AuthzTrans trans, CRUD modified, Data data, String ... override) {
463         // Do nothing
464     }
465
466 }