2 * ============LICENSE_START====================================================
4 * ===========================================================================
5 * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
7 * Modification Copyright (c) 2019 IBM
8 * ===========================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END====================================================
24 package org.onap.aaf.auth.dao.cass;
26 import java.io.IOException;
27 import java.net.HttpURLConnection;
29 import java.util.Date;
30 import java.util.HashMap;
31 import java.util.HashSet;
33 import java.util.Map.Entry;
34 import java.util.concurrent.BlockingQueue;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.TimeUnit;
39 import org.onap.aaf.auth.dao.AbsCassDAO;
40 import org.onap.aaf.auth.dao.CIDAO;
41 import org.onap.aaf.auth.dao.CassAccess;
42 import org.onap.aaf.auth.dao.CassDAOImpl;
43 import org.onap.aaf.auth.dao.Loader;
44 import org.onap.aaf.auth.env.AuthzEnv;
45 import org.onap.aaf.auth.env.AuthzTrans;
46 import org.onap.aaf.auth.layer.Result;
47 import org.onap.aaf.cadi.CadiException;
48 import org.onap.aaf.cadi.SecuritySetter;
49 import org.onap.aaf.cadi.client.Future;
50 import org.onap.aaf.cadi.client.Rcli;
51 import org.onap.aaf.cadi.client.Retryable;
52 import org.onap.aaf.cadi.config.Config;
53 import org.onap.aaf.cadi.http.HMangr;
54 import org.onap.aaf.misc.env.APIException;
55 import org.onap.aaf.misc.env.Env;
56 import org.onap.aaf.misc.env.TimeTaken;
57 import org.onap.aaf.misc.env.Trans;
59 import com.datastax.driver.core.BoundStatement;
60 import com.datastax.driver.core.Cluster;
61 import com.datastax.driver.core.PreparedStatement;
62 import com.datastax.driver.core.ResultSet;
63 import com.datastax.driver.core.Row;
64 import com.datastax.driver.core.exceptions.DriverException;
66 public class CacheInfoDAO extends CassDAOImpl<AuthzTrans,CacheInfoDAO.Data> implements CIDAO<AuthzTrans> {
68 private static final String TABLE = "cache";
69 public static final Map<String,Date[]> info = new ConcurrentHashMap<>();
71 private static CacheUpdate cacheUpdate;
73 // Hold current time stamps from Tables
74 private final Date startTime;
75 private final boolean cacheNotify;
76 private PreparedStatement psCheck;
78 //////////////////////////////////////////
79 // Data Definition, matches Cassandra DM
80 //////////////////////////////////////////
81 private static final int KEYLIMIT = 2;
83 public CacheInfoDAO(AuthzTrans trans, Cluster cluster, String keyspace) throws APIException, IOException {
84 super(trans, CacheInfoDAO.class.getSimpleName(),cluster,keyspace,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE));
85 startTime = new Date();
86 cacheNotify = noK8s(trans);
90 public CacheInfoDAO(AuthzTrans trans, AbsCassDAO<AuthzTrans,?> aDao) throws APIException, IOException {
91 super(trans, CacheInfoDAO.class.getSimpleName(),aDao,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE));
92 startTime = new Date();
94 cacheNotify = noK8s(trans);
98 * Need a different point to point cache clear strategy for K8s...
102 private boolean noK8s(AuthzTrans trans) {
103 String container = trans.getProperty(Config.AAF_LOCATOR_CONTAINER);
104 return ! ("helm".equals(container) || "oom".equals(container));
109 public static class Data {
118 public Data(String name, int seg) {
125 private static class InfoLoader extends Loader<Data> {
126 public static final InfoLoader dflt = new InfoLoader(KEYLIMIT);
128 public InfoLoader(int keylimit) {
133 public Data load(Data data, Row row) {
134 // Int more efficient
135 data.name = row.getString(0);
136 data.seg = row.getInt(1);
137 data.touched = row.getTimestamp(2);
142 protected void key(Data data, int idxParam, Object[] obj) {
150 protected void body(Data data, int idx, Object[] obj) {
151 obj[idx]=data.touched;
155 public static synchronized <T extends Trans> void startUpdate(AuthzEnv env, HMangr hman, SecuritySetter<HttpURLConnection> ss, String ip, int port) {
156 if (cacheUpdate==null) {
157 cacheUpdate = new CacheUpdate(env,hman,ss, ip,port);
158 Thread t= new Thread(cacheUpdate,"CacheInfo Update Thread");
164 public static<T extends Trans> void stopUpdate() {
165 if (cacheUpdate!=null) {
166 cacheUpdate.go=false;
170 private static final class CacheUpdate extends Thread {
171 public static BlockingQueue<Transfer> notifyDQ = new LinkedBlockingQueue<Transfer>(2000);
173 private static final String VOID_CT="application/Void+json;q=1.0;charset=utf-8;version=2.0,application/json;q=1.0;version=2.0,*/*;q=1.0";
174 private AuthzEnv env;
176 private SecuritySetter<HttpURLConnection> ss;
177 private final String authority;
178 public boolean go = true;
180 public CacheUpdate(AuthzEnv env, HMangr hman, SecuritySetter<HttpURLConnection> ss, String ip, int port) {
185 this.authority = ip+':'+port;
188 private static class Transfer {
191 public Transfer(String table, int[] segs) {
196 private class CacheClear extends Retryable<Integer> {
198 private AuthzTrans trans;
202 public CacheClear(AuthzTrans trans) {
206 public void set(Entry<String, IntHolder> es) {
208 segs = es.getValue().toString();
212 public Integer code(Rcli<?> client) throws APIException, CadiException {
213 URI to = client.getURI();
214 if (!to.getAuthority().equals(authority)) {
215 Future<Void> f = client.delete("/mgmt/cache/"+type+'/'+segs,VOID_CT);
216 if (f.get(hman.readTimeout())) {
219 trans.error().log("Error During AAF Peer Notify",f.code(),f.body());
226 private class IntHolder {
228 HashSet<Integer> set;
230 public IntHolder(int ints[]) {
234 public void add(int[] ints) {
236 set = new HashSet<>();
238 for (int i=0;i<raw.length;++i) {
242 for (int i=0;i<ints.length;++i) {
248 public String toString() {
249 StringBuilder sb = new StringBuilder();
250 boolean first = true;
261 for (Integer i : set) {
270 return sb.toString();
278 Transfer data = notifyDQ.poll(4,TimeUnit.SECONDS);
284 CacheClear cc = null;
285 Map<String,IntHolder> gather = null;
286 AuthzTrans trans = null;
288 // Do a block poll first
291 start = System.nanoTime();
292 trans = env.newTransNoAvg();
293 cc = new CacheClear(trans);
294 gather = new HashMap<>();
296 IntHolder prev = gather.get(data.table);
298 gather.put(data.table,new IntHolder(data.segs));
302 // continue while there is data
303 } while ((data = notifyDQ.poll())!=null);
305 for (Entry<String, IntHolder> es : gather.entrySet()) {
308 if (hman.all(ss, cc, false)!=null) {
311 } catch (Exception e) {
312 trans.error().log(e, "Error on Cache Update");
315 if (env.debug().isLoggable()) {
316 float millis = (System.nanoTime()-start)/1000000f;
317 StringBuilder sb = new StringBuilder("Direct Cache Refresh: ");
318 sb.append("Updated ");
321 sb.append(" entry for ");
323 sb.append(" entries for ");
325 int peers = count<=0?0:cc.total/count;
327 sb.append(" client");
334 trans.auditTrail(0, sb, Env.REMOTE);
338 } catch (InterruptedException e1) {
340 Thread.currentThread().interrupt();
346 private void init(AuthzTrans trans) throws APIException, IOException {
347 String[] helpers = setCRUD(trans, TABLE, Data.class, InfoLoader.dflt);
348 psCheck = getSession(trans).prepare(SELECT_SP + helpers[FIELD_COMMAS] + " FROM " + TABLE);
350 disable(CRUD.create);
351 disable(CRUD.delete);
355 * @see org.onap.aaf.auth.dao.cass.CIDAO#touch(org.onap.aaf.auth.env.test.AuthzTrans, java.lang.String, int)
359 public Result<Void> touch(AuthzTrans trans, String name, int ... seg) {
361 // Direct Service Cache Invalidation
363 // ConcurrentQueues are open-ended. We don't want any Memory leaks
364 // Note: we keep a separate counter, because "size()" on a Linked Queue is expensive
365 if (cacheNotify && cacheUpdate!=null) {
367 if (!CacheUpdate.notifyDQ.offer(new CacheUpdate.Transfer(name, seg),2,TimeUnit.SECONDS)) {
368 trans.error().log("Cache Notify Queue is not accepting messages, bouncing may be appropriate" );
370 } catch (InterruptedException e) {
371 trans.error().log("Cache Notify Queue posting was interrupted" );
372 Thread.currentThread().interrupt();
377 // Table Based Cache Invalidation (original)
379 // Note: Save time with multiple Sequence Touches, but PreparedStmt doesn't support IN
380 StringBuilder start = new StringBuilder("CacheInfoDAO Touch segments ");
383 StringBuilder sb = new StringBuilder("BEGIN BATCH\n");
384 boolean first = true;
386 sb.append(UPDATE_SP);
388 sb.append(" SET touched=dateof(now()) WHERE name = '");
390 sb.append("' AND seg = ");
400 sb.append("APPLY BATCH;");
401 TimeTaken tt = trans.start(start.toString(),Env.REMOTE);
403 getSession(trans).executeAsync(sb.toString());
404 } catch (DriverException | APIException | IOException e) {
405 reportPerhapsReset(trans,e);
406 return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG);
414 * @see org.onap.aaf.auth.dao.cass.CIDAO#check(org.onap.aaf.auth.env.test.AuthzTrans)
417 public Result<Void> check(AuthzTrans trans) {
419 TimeTaken tt = trans.start("Check Table Timestamps",Env.REMOTE);
421 rs = getSession(trans).execute(new BoundStatement(psCheck));
422 } catch (DriverException | APIException | IOException e) {
423 reportPerhapsReset(trans,e);
424 return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG);
429 String lastName = null;
431 for (Row row : rs.all()) {
432 String name = row.getString(0);
433 int seg = row.getInt(1);
434 if (!name.equals(lastName)) {
435 dates = info.get(name);
439 dates=new Date[seg+1];
440 info.put(name,dates);
441 } else if (dates.length<=seg) {
442 Date[] temp = new Date[seg+1];
443 System.arraycopy(dates, 0, temp, 0, dates.length);
445 info.put(name, dates);
447 Date temp = row.getTimestamp(2);
448 if (dates[seg]==null || dates[seg].before(temp)) {
456 * @see org.onap.aaf.auth.dao.cass.CIDAO#get(java.lang.String, int)
459 public Date get(AuthzTrans trans, String table, int seg) {
460 Date[] dates = info.get(table);
462 dates = new Date[seg+1];
463 touch(trans,table, seg);
464 } else if (dates.length<=seg) {
465 Date[] temp = new Date[seg+1];
466 System.arraycopy(dates, 0, temp, 0, dates.length);
469 Date rv = dates[seg];
471 rv=dates[seg]=startTime;
477 protected void wasModified(AuthzTrans trans, CRUD modified, Data data, String ... override) {