X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=auth%2Fauth-cass%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Faaf%2Fauth%2Fdao%2Fcass%2FCacheInfoDAO.java;h=eea3c4c27c2f896b3f68a07afed523e9d44616a7;hb=3d1706fcbe7f95830ff6fd23cf679ee55c6d0595;hp=e3f994c0af1b74887f7a42763f5222f43a0c6c6c;hpb=4b5a7d721d994a49057e9bfb403c7bff1b376660;p=aaf%2Fauthz.git diff --git a/auth/auth-cass/src/main/java/org/onap/aaf/auth/dao/cass/CacheInfoDAO.java b/auth/auth-cass/src/main/java/org/onap/aaf/auth/dao/cass/CacheInfoDAO.java index e3f994c0..eea3c4c2 100644 --- a/auth/auth-cass/src/main/java/org/onap/aaf/auth/dao/cass/CacheInfoDAO.java +++ b/auth/auth-cass/src/main/java/org/onap/aaf/auth/dao/cass/CacheInfoDAO.java @@ -3,6 +3,8 @@ * org.onap.aaf * =========================================================================== * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. + * + * Modification Copyright (c) 2019 IBM * =========================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -47,6 +49,7 @@ import org.onap.aaf.cadi.SecuritySetter; import org.onap.aaf.cadi.client.Future; import org.onap.aaf.cadi.client.Rcli; import org.onap.aaf.cadi.client.Retryable; +import org.onap.aaf.cadi.config.Config; import org.onap.aaf.cadi.http.HMangr; import org.onap.aaf.misc.env.APIException; import org.onap.aaf.misc.env.Env; @@ -69,11 +72,18 @@ public class CacheInfoDAO extends CassDAOImpl impl // Hold current time stamps from Tables private final Date startTime; + private final boolean cacheNotify; private PreparedStatement psCheck; + + ////////////////////////////////////////// + // Data Definition, matches Cassandra DM + ////////////////////////////////////////// + private static final int KEYLIMIT = 2; public CacheInfoDAO(AuthzTrans trans, Cluster cluster, String keyspace) throws APIException, IOException { super(trans, CacheInfoDAO.class.getSimpleName(),cluster,keyspace,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE)); startTime = new Date(); + cacheNotify = noK8s(trans); init(trans); } @@ -81,17 +91,26 @@ public class CacheInfoDAO extends CassDAOImpl impl super(trans, CacheInfoDAO.class.getSimpleName(),aDao,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE)); startTime = new Date(); init(trans); + cacheNotify = noK8s(trans); } - - ////////////////////////////////////////// - // Data Definition, matches Cassandra DM - ////////////////////////////////////////// - private static final int KEYLIMIT = 2; + /** + * Need a different point to point cache clear strategy for K8s... + * @param trans + * @return + */ + private boolean noK8s(AuthzTrans trans) { + String container = trans.getProperty(Config.AAF_LOCATOR_CONTAINER); + return ! ("helm".equals(container) || "oom".equals(container)); + } /** * @author Jonathan */ public static class Data { + public String name; + public int seg; + public Date touched; + public Data() { name = null; touched = null; @@ -101,10 +120,6 @@ public class CacheInfoDAO extends CassDAOImpl impl this.seg = seg; touched = null; } - - public String name; - public int seg; - public Date touched; } private static class InfoLoader extends Loader { @@ -124,8 +139,8 @@ public class CacheInfoDAO extends CassDAOImpl impl } @Override - protected void key(Data data, int _idx, Object[] obj) { - int idx = _idx; + protected void key(Data data, int idxParam, Object[] obj) { + int idx = idxParam; obj[idx]=data.name; obj[++idx]=data.seg; @@ -137,21 +152,22 @@ public class CacheInfoDAO extends CassDAOImpl impl } } - public static void startUpdate(AuthzEnv env, HMangr hman, SecuritySetter ss, String ip, int port) { - if(cacheUpdate==null) { - Thread t= new Thread(cacheUpdate = new CacheUpdate(env,hman,ss, ip,port),"CacheInfo Update Thread"); + public static synchronized void startUpdate(AuthzEnv env, HMangr hman, SecuritySetter ss, String ip, int port) { + if (cacheUpdate==null) { + cacheUpdate = new CacheUpdate(env,hman,ss, ip,port); + Thread t= new Thread(cacheUpdate,"CacheInfo Update Thread"); t.setDaemon(true); t.start(); } } public static void stopUpdate() { - if(cacheUpdate!=null) { + if (cacheUpdate!=null) { cacheUpdate.go=false; } } - private final static class CacheUpdate extends Thread { + private static final class CacheUpdate extends Thread { public static BlockingQueue notifyDQ = new LinkedBlockingQueue(2000); private static final String VOID_CT="application/Void+json;q=1.0;charset=utf-8;version=2.0,application/json;q=1.0;version=2.0,*/*;q=1.0"; @@ -195,9 +211,9 @@ public class CacheInfoDAO extends CassDAOImpl impl @Override public Integer code(Rcli client) throws APIException, CadiException { URI to = client.getURI(); - if(!to.getAuthority().equals(authority)) { + if (!to.getAuthority().equals(authority)) { Future f = client.delete("/mgmt/cache/"+type+'/'+segs,VOID_CT); - if(f.get(hman.readTimeout())) { + if (f.get(hman.readTimeout())) { ++total; } else { trans.error().log("Error During AAF Peer Notify",f.code(),f.body()); @@ -216,14 +232,14 @@ public class CacheInfoDAO extends CassDAOImpl impl set = null; } public void add(int[] ints) { - if(set==null) { + if (set==null) { set = new HashSet<>(); - for(int i=0;i impl public String toString() { StringBuilder sb = new StringBuilder(); boolean first = true; - if(set==null) { - for(int i : raw) { - if(first) { + if (set==null) { + for (int i : raw) { + if (first) { first=false; } else { sb.append(','); @@ -242,8 +258,8 @@ public class CacheInfoDAO extends CassDAOImpl impl sb.append(i); } } else { - for(Integer i : set) { - if(first) { + for (Integer i : set) { + if (first) { first=false; } else { sb.append(','); @@ -260,7 +276,7 @@ public class CacheInfoDAO extends CassDAOImpl impl do { try { Transfer data = notifyDQ.poll(4,TimeUnit.SECONDS); - if(data==null) { + if (data==null) { continue; } @@ -271,37 +287,37 @@ public class CacheInfoDAO extends CassDAOImpl impl long start=0; // Do a block poll first do { - if(gather==null) { + if (gather==null) { start = System.nanoTime(); trans = env.newTransNoAvg(); cc = new CacheClear(trans); gather = new HashMap<>(); } IntHolder prev = gather.get(data.table); - if(prev==null) { + if (prev==null) { gather.put(data.table,new IntHolder(data.segs)); } else { prev.add(data.segs); } // continue while there is data - } while((data = notifyDQ.poll())!=null); - if(gather!=null) { - for(Entry es : gather.entrySet()) { + } while ((data = notifyDQ.poll())!=null); + if (gather!=null) { + for (Entry es : gather.entrySet()) { cc.set(es); try { - if(hman.all(ss, cc, false)!=null) { + if (hman.all(ss, cc, false)!=null) { ++count; } } catch (Exception e) { trans.error().log(e, "Error on Cache Update"); } } - if(env.debug().isLoggable()) { + if (env.debug().isLoggable()) { float millis = (System.nanoTime()-start)/1000000f; StringBuilder sb = new StringBuilder("Direct Cache Refresh: "); sb.append("Updated "); sb.append(count); - if(count==1) { + if (count==1) { sb.append(" entry for "); } else { sb.append(" entries for "); @@ -309,7 +325,7 @@ public class CacheInfoDAO extends CassDAOImpl impl int peers = count<=0?0:cc.total/count; sb.append(peers); sb.append(" client"); - if(peers!=1) { + if (peers!=1) { sb.append('s'); } sb.append(" in "); @@ -323,12 +339,11 @@ public class CacheInfoDAO extends CassDAOImpl impl go = false; Thread.currentThread().interrupt(); } - } while(go); + } while (go); } } private void init(AuthzTrans trans) throws APIException, IOException { - String[] helpers = setCRUD(trans, TABLE, Data.class, InfoLoader.dflt); psCheck = getSession(trans).prepare(SELECT_SP + helpers[FIELD_COMMAS] + " FROM " + TABLE); @@ -347,9 +362,9 @@ public class CacheInfoDAO extends CassDAOImpl impl ///////////// // ConcurrentQueues are open-ended. We don't want any Memory leaks // Note: we keep a separate counter, because "size()" on a Linked Queue is expensive - if(cacheUpdate!=null) { + if (cacheNotify && cacheUpdate!=null) { try { - if(!CacheUpdate.notifyDQ.offer(new CacheUpdate.Transfer(name, seg),2,TimeUnit.SECONDS)) { + if (!CacheUpdate.notifyDQ.offer(new CacheUpdate.Transfer(name, seg),2,TimeUnit.SECONDS)) { trans.error().log("Cache Notify Queue is not accepting messages, bouncing may be appropriate" ); } } catch (InterruptedException e) { @@ -367,7 +382,7 @@ public class CacheInfoDAO extends CassDAOImpl impl start.append(": "); StringBuilder sb = new StringBuilder("BEGIN BATCH\n"); boolean first = true; - for(int s : seg) { + for (int s : seg) { sb.append(UPDATE_SP); sb.append(TABLE); sb.append(" SET touched=dateof(now()) WHERE name = '"); @@ -375,7 +390,7 @@ public class CacheInfoDAO extends CassDAOImpl impl sb.append("' AND seg = "); sb.append(s); sb.append(";\n"); - if(first) { + if (first) { first =false; } else { start.append(','); @@ -413,24 +428,24 @@ public class CacheInfoDAO extends CassDAOImpl impl String lastName = null; Date[] dates = null; - for(Row row : rs.all()) { + for (Row row : rs.all()) { String name = row.getString(0); int seg = row.getInt(1); - if(!name.equals(lastName)) { + if (!name.equals(lastName)) { dates = info.get(name); lastName=name; } - if(dates==null) { + if (dates==null) { dates=new Date[seg+1]; info.put(name,dates); - } else if(dates.length<=seg) { + } else if (dates.length<=seg) { Date[] temp = new Date[seg+1]; System.arraycopy(dates, 0, temp, 0, dates.length); dates = temp; info.put(name, dates); } Date temp = row.getTimestamp(2); - if(dates[seg]==null || dates[seg].before(temp)) { + if (dates[seg]==null || dates[seg].before(temp)) { dates[seg]=temp; } } @@ -443,16 +458,16 @@ public class CacheInfoDAO extends CassDAOImpl impl @Override public Date get(AuthzTrans trans, String table, int seg) { Date[] dates = info.get(table); - if(dates==null) { + if (dates==null) { dates = new Date[seg+1]; touch(trans,table, seg); - } else if(dates.length<=seg) { + } else if (dates.length<=seg) { Date[] temp = new Date[seg+1]; System.arraycopy(dates, 0, temp, 0, dates.length); dates = temp; } Date rv = dates[seg]; - if(rv==null) { + if (rv==null) { rv=dates[seg]=startTime; } return rv;