X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=auth%2Fauth-cass%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Faaf%2Fauth%2Fdao%2Fcass%2FCacheInfoDAO.java;h=4aa94866ae29b17a197598372c94cff6db33b492;hb=7e966914050e66219689001ff4ab601a49eef0ac;hp=6d9900b92edf200e7da45c098f207b1985e85033;hpb=f85f0889b3b0e5e9694afab4dd01a4a97a155188;p=aaf%2Fauthz.git diff --git a/auth/auth-cass/src/main/java/org/onap/aaf/auth/dao/cass/CacheInfoDAO.java b/auth/auth-cass/src/main/java/org/onap/aaf/auth/dao/cass/CacheInfoDAO.java index 6d9900b9..4aa94866 100644 --- a/auth/auth-cass/src/main/java/org/onap/aaf/auth/dao/cass/CacheInfoDAO.java +++ b/auth/auth-cass/src/main/java/org/onap/aaf/auth/dao/cass/CacheInfoDAO.java @@ -62,405 +62,405 @@ import com.datastax.driver.core.exceptions.DriverException; public class CacheInfoDAO extends CassDAOImpl implements CIDAO { - private static final String TABLE = "cache"; - public static final Map info = new ConcurrentHashMap<>(); + private static final String TABLE = "cache"; + public static final Map info = new ConcurrentHashMap<>(); - private static CacheUpdate cacheUpdate; - - // Hold current time stamps from Tables - private final Date startTime; - private PreparedStatement psCheck; - - public CacheInfoDAO(AuthzTrans trans, Cluster cluster, String keyspace) throws APIException, IOException { - super(trans, CacheInfoDAO.class.getSimpleName(),cluster,keyspace,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE)); - startTime = new Date(); - init(trans); - } + private static CacheUpdate cacheUpdate; + + // Hold current time stamps from Tables + private final Date startTime; + private PreparedStatement psCheck; + + public CacheInfoDAO(AuthzTrans trans, Cluster cluster, String keyspace) throws APIException, IOException { + super(trans, CacheInfoDAO.class.getSimpleName(),cluster,keyspace,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE)); + startTime = new Date(); + init(trans); + } - public CacheInfoDAO(AuthzTrans trans, AbsCassDAO aDao) throws APIException, IOException { - super(trans, CacheInfoDAO.class.getSimpleName(),aDao,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE)); - startTime = new Date(); - init(trans); - } + public CacheInfoDAO(AuthzTrans trans, AbsCassDAO aDao) throws APIException, IOException { + super(trans, CacheInfoDAO.class.getSimpleName(),aDao,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE)); + startTime = new Date(); + init(trans); + } ////////////////////////////////////////// // Data Definition, matches Cassandra DM ////////////////////////////////////////// private static final int KEYLIMIT = 2; - /** + /** * @author Jonathan */ - public static class Data { - public Data() { - name = null; - touched = null; - } - public Data(String name, int seg) { - this.name = name; - this.seg = seg; - touched = null; - } - - public String name; - public int seg; - public Date touched; + public static class Data { + public Data() { + name = null; + touched = null; + } + public Data(String name, int seg) { + this.name = name; + this.seg = seg; + touched = null; + } + + public String name; + public int seg; + public Date touched; } private static class InfoLoader extends Loader { - public static final InfoLoader dflt = new InfoLoader(KEYLIMIT); - - public InfoLoader(int keylimit) { - super(keylimit); - } - - @Override - public Data load(Data data, Row row) { - // Int more efficient - data.name = row.getString(0); - data.seg = row.getInt(1); - data.touched = row.getTimestamp(2); - return data; - } + public static final InfoLoader dflt = new InfoLoader(KEYLIMIT); + + public InfoLoader(int keylimit) { + super(keylimit); + } + + @Override + public Data load(Data data, Row row) { + // Int more efficient + data.name = row.getString(0); + data.seg = row.getInt(1); + data.touched = row.getTimestamp(2); + return data; + } - @Override - protected void key(Data data, int _idx, Object[] obj) { - int idx = _idx; + @Override + protected void key(Data data, int _idx, Object[] obj) { + int idx = _idx; - obj[idx]=data.name; - obj[++idx]=data.seg; - } + obj[idx]=data.name; + obj[++idx]=data.seg; + } - @Override - protected void body(Data data, int idx, Object[] obj) { - obj[idx]=data.touched; - } + @Override + protected void body(Data data, int idx, Object[] obj) { + obj[idx]=data.touched; + } } - public static void startUpdate(AuthzEnv env, HMangr hman, SecuritySetter ss, String ip, int port) { - if(cacheUpdate==null) { - Thread t= new Thread(cacheUpdate = new CacheUpdate(env,hman,ss, ip,port),"CacheInfo Update Thread"); - t.setDaemon(true); - t.start(); - } - } + public static void startUpdate(AuthzEnv env, HMangr hman, SecuritySetter ss, String ip, int port) { + if (cacheUpdate==null) { + Thread t= new Thread(cacheUpdate = new CacheUpdate(env,hman,ss, ip,port),"CacheInfo Update Thread"); + t.setDaemon(true); + t.start(); + } + } - public static void stopUpdate() { - if(cacheUpdate!=null) { - cacheUpdate.go=false; - } - } + public static void stopUpdate() { + if (cacheUpdate!=null) { + cacheUpdate.go=false; + } + } - private final static class CacheUpdate extends Thread { - public static BlockingQueue notifyDQ = new LinkedBlockingQueue(2000); + private final static class CacheUpdate extends Thread { + public static BlockingQueue notifyDQ = new LinkedBlockingQueue(2000); - private static final String VOID_CT="application/Void+json;q=1.0;charset=utf-8;version=2.0,application/json;q=1.0;version=2.0,*/*;q=1.0"; - private AuthzEnv env; - private HMangr hman; - private SecuritySetter ss; - private final String authority; - public boolean go = true; - - public CacheUpdate(AuthzEnv env, HMangr hman, SecuritySetter ss, String ip, int port) { - this.env = env; - this.hman = hman; - this.ss = ss; - - this.authority = ip+':'+port; - } - - private static class Transfer { - public String table; - public int segs[]; - public Transfer(String table, int[] segs) { - this.table = table; - this.segs = segs; - } - } - private class CacheClear extends Retryable { - public int total=0; - private AuthzTrans trans; - private String type; - private String segs; - - public CacheClear(AuthzTrans trans) { - this.trans = trans; - } + private static final String VOID_CT="application/Void+json;q=1.0;charset=utf-8;version=2.0,application/json;q=1.0;version=2.0,*/*;q=1.0"; + private AuthzEnv env; + private HMangr hman; + private SecuritySetter ss; + private final String authority; + public boolean go = true; + + public CacheUpdate(AuthzEnv env, HMangr hman, SecuritySetter ss, String ip, int port) { + this.env = env; + this.hman = hman; + this.ss = ss; + + this.authority = ip+':'+port; + } + + private static class Transfer { + public String table; + public int segs[]; + public Transfer(String table, int[] segs) { + this.table = table; + this.segs = segs; + } + } + private class CacheClear extends Retryable { + public int total=0; + private AuthzTrans trans; + private String type; + private String segs; + + public CacheClear(AuthzTrans trans) { + this.trans = trans; + } - public void set(Entry es) { - type = es.getKey(); - segs = es.getValue().toString(); - } - - @Override - public Integer code(Rcli client) throws APIException, CadiException { - URI to = client.getURI(); - if(!to.getAuthority().equals(authority)) { - Future f = client.delete("/mgmt/cache/"+type+'/'+segs,VOID_CT); - if(f.get(hman.readTimeout())) { - ++total; - } else { - trans.error().log("Error During AAF Peer Notify",f.code(),f.body()); - } - } - return total; - } - } - - private class IntHolder { - private int[] raw; - HashSet set; - - public IntHolder(int ints[]) { - raw = ints; - set = null; - } - public void add(int[] ints) { - if(set==null) { - set = new HashSet<>(); - - for(int i=0;i es) { + type = es.getKey(); + segs = es.getValue().toString(); + } + + @Override + public Integer code(Rcli client) throws APIException, CadiException { + URI to = client.getURI(); + if (!to.getAuthority().equals(authority)) { + Future f = client.delete("/mgmt/cache/"+type+'/'+segs,VOID_CT); + if (f.get(hman.readTimeout())) { + ++total; + } else { + trans.error().log("Error During AAF Peer Notify",f.code(),f.body()); + } + } + return total; + } + } + + private class IntHolder { + private int[] raw; + HashSet set; + + public IntHolder(int ints[]) { + raw = ints; + set = null; + } + public void add(int[] ints) { + if (set==null) { + set = new HashSet<>(); + + for (int i=0;i gather = null; - AuthzTrans trans = null; - long start=0; - // Do a block poll first - do { - if(gather==null) { - start = System.nanoTime(); - trans = env.newTransNoAvg(); - cc = new CacheClear(trans); - gather = new HashMap<>(); - } - IntHolder prev = gather.get(data.table); - if(prev==null) { - gather.put(data.table,new IntHolder(data.segs)); - } else { - prev.add(data.segs); - } - // continue while there is data - } while((data = notifyDQ.poll())!=null); - if(gather!=null) { - for(Entry es : gather.entrySet()) { - cc.set(es); - try { - if(hman.all(ss, cc, false)!=null) { - ++count; - } - } catch (Exception e) { - trans.error().log(e, "Error on Cache Update"); - } - } - if(env.debug().isLoggable()) { - float millis = (System.nanoTime()-start)/1000000f; - StringBuilder sb = new StringBuilder("Direct Cache Refresh: "); - sb.append("Updated "); - sb.append(count); - if(count==1) { - sb.append(" entry for "); - } else { - sb.append(" entries for "); - } - int peers = count<=0?0:cc.total/count; - sb.append(peers); - sb.append(" client"); - if(peers!=1) { - sb.append('s'); - } - sb.append(" in "); - sb.append(millis); - sb.append("ms"); - trans.auditTrail(0, sb, Env.REMOTE); - env.debug().log(sb); - } - } - } catch (InterruptedException e1) { - go = false; - Thread.currentThread().interrupt(); - } - } while(go); - } - } + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + boolean first = true; + if (set==null) { + for (int i : raw) { + if (first) { + first=false; + } else { + sb.append(','); + } + sb.append(i); + } + } else { + for (Integer i : set) { + if (first) { + first=false; + } else { + sb.append(','); + } + sb.append(i); + } + } + return sb.toString(); + } + } + + @Override + public void run() { + do { + try { + Transfer data = notifyDQ.poll(4,TimeUnit.SECONDS); + if (data==null) { + continue; + } + + int count = 0; + CacheClear cc = null; + Map gather = null; + AuthzTrans trans = null; + long start=0; + // Do a block poll first + do { + if (gather==null) { + start = System.nanoTime(); + trans = env.newTransNoAvg(); + cc = new CacheClear(trans); + gather = new HashMap<>(); + } + IntHolder prev = gather.get(data.table); + if (prev==null) { + gather.put(data.table,new IntHolder(data.segs)); + } else { + prev.add(data.segs); + } + // continue while there is data + } while ((data = notifyDQ.poll())!=null); + if (gather!=null) { + for (Entry es : gather.entrySet()) { + cc.set(es); + try { + if (hman.all(ss, cc, false)!=null) { + ++count; + } + } catch (Exception e) { + trans.error().log(e, "Error on Cache Update"); + } + } + if (env.debug().isLoggable()) { + float millis = (System.nanoTime()-start)/1000000f; + StringBuilder sb = new StringBuilder("Direct Cache Refresh: "); + sb.append("Updated "); + sb.append(count); + if (count==1) { + sb.append(" entry for "); + } else { + sb.append(" entries for "); + } + int peers = count<=0?0:cc.total/count; + sb.append(peers); + sb.append(" client"); + if (peers!=1) { + sb.append('s'); + } + sb.append(" in "); + sb.append(millis); + sb.append("ms"); + trans.auditTrail(0, sb, Env.REMOTE); + env.debug().log(sb); + } + } + } catch (InterruptedException e1) { + go = false; + Thread.currentThread().interrupt(); + } + } while (go); + } + } - private void init(AuthzTrans trans) throws APIException, IOException { - - String[] helpers = setCRUD(trans, TABLE, Data.class, InfoLoader.dflt); - psCheck = getSession(trans).prepare(SELECT_SP + helpers[FIELD_COMMAS] + " FROM " + TABLE); + private void init(AuthzTrans trans) throws APIException, IOException { + + String[] helpers = setCRUD(trans, TABLE, Data.class, InfoLoader.dflt); + psCheck = getSession(trans).prepare(SELECT_SP + helpers[FIELD_COMMAS] + " FROM " + TABLE); - disable(CRUD.create); - disable(CRUD.delete); - } + disable(CRUD.create); + disable(CRUD.delete); + } - /* (non-Javadoc) - * @see org.onap.aaf.auth.dao.cass.CIDAO#touch(org.onap.aaf.auth.env.test.AuthzTrans, java.lang.String, int) - */ - - @Override - public Result touch(AuthzTrans trans, String name, int ... seg) { - ///////////// - // Direct Service Cache Invalidation - ///////////// - // ConcurrentQueues are open-ended. We don't want any Memory leaks - // Note: we keep a separate counter, because "size()" on a Linked Queue is expensive - if(cacheUpdate!=null) { - try { - if(!CacheUpdate.notifyDQ.offer(new CacheUpdate.Transfer(name, seg),2,TimeUnit.SECONDS)) { - trans.error().log("Cache Notify Queue is not accepting messages, bouncing may be appropriate" ); - } - } catch (InterruptedException e) { - trans.error().log("Cache Notify Queue posting was interrupted" ); - Thread.currentThread().interrupt(); - } - } + /* (non-Javadoc) + * @see org.onap.aaf.auth.dao.cass.CIDAO#touch(org.onap.aaf.auth.env.test.AuthzTrans, java.lang.String, int) + */ + + @Override + public Result touch(AuthzTrans trans, String name, int ... seg) { + ///////////// + // Direct Service Cache Invalidation + ///////////// + // ConcurrentQueues are open-ended. We don't want any Memory leaks + // Note: we keep a separate counter, because "size()" on a Linked Queue is expensive + if (cacheUpdate!=null) { + try { + if (!CacheUpdate.notifyDQ.offer(new CacheUpdate.Transfer(name, seg),2,TimeUnit.SECONDS)) { + trans.error().log("Cache Notify Queue is not accepting messages, bouncing may be appropriate" ); + } + } catch (InterruptedException e) { + trans.error().log("Cache Notify Queue posting was interrupted" ); + Thread.currentThread().interrupt(); + } + } - ///////////// - // Table Based Cache Invalidation (original) - ///////////// - // Note: Save time with multiple Sequence Touches, but PreparedStmt doesn't support IN - StringBuilder start = new StringBuilder("CacheInfoDAO Touch segments "); - start.append(name); - start.append(": "); - StringBuilder sb = new StringBuilder("BEGIN BATCH\n"); - boolean first = true; - for(int s : seg) { - sb.append(UPDATE_SP); - sb.append(TABLE); - sb.append(" SET touched=dateof(now()) WHERE name = '"); - sb.append(name); - sb.append("' AND seg = "); - sb.append(s); - sb.append(";\n"); - if(first) { - first =false; - } else { - start.append(','); - } - start.append(s); - } - sb.append("APPLY BATCH;"); - TimeTaken tt = trans.start(start.toString(),Env.REMOTE); - try { - getSession(trans).executeAsync(sb.toString()); - } catch (DriverException | APIException | IOException e) { - reportPerhapsReset(trans,e); - return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG); - } finally { - tt.done(); - } - return Result.ok(); - } + ///////////// + // Table Based Cache Invalidation (original) + ///////////// + // Note: Save time with multiple Sequence Touches, but PreparedStmt doesn't support IN + StringBuilder start = new StringBuilder("CacheInfoDAO Touch segments "); + start.append(name); + start.append(": "); + StringBuilder sb = new StringBuilder("BEGIN BATCH\n"); + boolean first = true; + for (int s : seg) { + sb.append(UPDATE_SP); + sb.append(TABLE); + sb.append(" SET touched=dateof(now()) WHERE name = '"); + sb.append(name); + sb.append("' AND seg = "); + sb.append(s); + sb.append(";\n"); + if (first) { + first =false; + } else { + start.append(','); + } + start.append(s); + } + sb.append("APPLY BATCH;"); + TimeTaken tt = trans.start(start.toString(),Env.REMOTE); + try { + getSession(trans).executeAsync(sb.toString()); + } catch (DriverException | APIException | IOException e) { + reportPerhapsReset(trans,e); + return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG); + } finally { + tt.done(); + } + return Result.ok(); + } - /* (non-Javadoc) - * @see org.onap.aaf.auth.dao.cass.CIDAO#check(org.onap.aaf.auth.env.test.AuthzTrans) - */ - @Override - public Result check(AuthzTrans trans) { - ResultSet rs; - TimeTaken tt = trans.start("Check Table Timestamps",Env.REMOTE); - try { - rs = getSession(trans).execute(new BoundStatement(psCheck)); - } catch (DriverException | APIException | IOException e) { - reportPerhapsReset(trans,e); - return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG); - } finally { - tt.done(); - } - - String lastName = null; - Date[] dates = null; - for(Row row : rs.all()) { - String name = row.getString(0); - int seg = row.getInt(1); - if(!name.equals(lastName)) { - dates = info.get(name); - lastName=name; - } - if(dates==null) { - dates=new Date[seg+1]; - info.put(name,dates); - } else if(dates.length<=seg) { - Date[] temp = new Date[seg+1]; - System.arraycopy(dates, 0, temp, 0, dates.length); - dates = temp; - info.put(name, dates); - } - Date temp = row.getTimestamp(2); - if(dates[seg]==null || dates[seg].before(temp)) { - dates[seg]=temp; - } - } - return Result.ok(); - } - /* (non-Javadoc) - * @see org.onap.aaf.auth.dao.cass.CIDAO#get(java.lang.String, int) - */ + * @see org.onap.aaf.auth.dao.cass.CIDAO#check(org.onap.aaf.auth.env.test.AuthzTrans) + */ @Override - public Date get(AuthzTrans trans, String table, int seg) { - Date[] dates = info.get(table); - if(dates==null) { - dates = new Date[seg+1]; - touch(trans,table, seg); - } else if(dates.length<=seg) { - Date[] temp = new Date[seg+1]; - System.arraycopy(dates, 0, temp, 0, dates.length); - dates = temp; - } - Date rv = dates[seg]; - if(rv==null) { - rv=dates[seg]=startTime; - } - return rv; - } + public Result check(AuthzTrans trans) { + ResultSet rs; + TimeTaken tt = trans.start("Check Table Timestamps",Env.REMOTE); + try { + rs = getSession(trans).execute(new BoundStatement(psCheck)); + } catch (DriverException | APIException | IOException e) { + reportPerhapsReset(trans,e); + return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG); + } finally { + tt.done(); + } + + String lastName = null; + Date[] dates = null; + for (Row row : rs.all()) { + String name = row.getString(0); + int seg = row.getInt(1); + if (!name.equals(lastName)) { + dates = info.get(name); + lastName=name; + } + if (dates==null) { + dates=new Date[seg+1]; + info.put(name,dates); + } else if (dates.length<=seg) { + Date[] temp = new Date[seg+1]; + System.arraycopy(dates, 0, temp, 0, dates.length); + dates = temp; + info.put(name, dates); + } + Date temp = row.getTimestamp(2); + if (dates[seg]==null || dates[seg].before(temp)) { + dates[seg]=temp; + } + } + return Result.ok(); + } + + /* (non-Javadoc) + * @see org.onap.aaf.auth.dao.cass.CIDAO#get(java.lang.String, int) + */ + @Override + public Date get(AuthzTrans trans, String table, int seg) { + Date[] dates = info.get(table); + if (dates==null) { + dates = new Date[seg+1]; + touch(trans,table, seg); + } else if (dates.length<=seg) { + Date[] temp = new Date[seg+1]; + System.arraycopy(dates, 0, temp, 0, dates.length); + dates = temp; + } + Date rv = dates[seg]; + if (rv==null) { + rv=dates[seg]=startTime; + } + return rv; + } - @Override - protected void wasModified(AuthzTrans trans, CRUD modified, Data data, String ... override) { - // Do nothing - } + @Override + protected void wasModified(AuthzTrans trans, CRUD modified, Data data, String ... override) { + // Do nothing + } } \ No newline at end of file