1 /*******************************************************************************
\r
2 * ============LICENSE_START====================================================
\r
4 * * ===========================================================================
\r
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * * ===========================================================================
\r
7 * * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * * you may not use this file except in compliance with the License.
\r
9 * * You may obtain a copy of the License at
\r
11 * * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * * Unless required by applicable law or agreed to in writing, software
\r
14 * * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * * See the License for the specific language governing permissions and
\r
17 * * limitations under the License.
\r
18 * * ============LICENSE_END====================================================
\r
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
\r
22 ******************************************************************************/
\r
23 package org.onap.aaf.dao.aaf.cass;
\r
25 import java.io.IOException;
\r
26 import java.net.HttpURLConnection;
\r
27 import java.net.URI;
\r
28 import java.util.Date;
\r
29 import java.util.HashMap;
\r
30 import java.util.HashSet;
\r
31 import java.util.Map;
\r
32 import java.util.Map.Entry;
\r
33 import java.util.concurrent.BlockingQueue;
\r
34 import java.util.concurrent.ConcurrentHashMap;
\r
35 import java.util.concurrent.LinkedBlockingQueue;
\r
36 import java.util.concurrent.TimeUnit;
\r
38 import org.onap.aaf.authz.env.AuthzEnv;
\r
39 import org.onap.aaf.authz.env.AuthzTrans;
\r
40 import org.onap.aaf.authz.layer.Result;
\r
41 import org.onap.aaf.dao.AbsCassDAO;
\r
42 import org.onap.aaf.dao.CIDAO;
\r
43 import org.onap.aaf.dao.CassAccess;
\r
44 import org.onap.aaf.dao.CassDAOImpl;
\r
45 import org.onap.aaf.dao.Loader;
\r
47 import org.onap.aaf.cadi.CadiException;
\r
48 import org.onap.aaf.cadi.SecuritySetter;
\r
49 import org.onap.aaf.cadi.client.Future;
\r
50 import org.onap.aaf.cadi.client.Rcli;
\r
51 import org.onap.aaf.cadi.client.Retryable;
\r
52 import org.onap.aaf.cadi.http.HMangr;
\r
53 import org.onap.aaf.inno.env.APIException;
\r
54 import org.onap.aaf.inno.env.Env;
\r
55 import org.onap.aaf.inno.env.TimeTaken;
\r
56 import org.onap.aaf.inno.env.Trans;
\r
57 import com.datastax.driver.core.BoundStatement;
\r
58 import com.datastax.driver.core.Cluster;
\r
59 import com.datastax.driver.core.ResultSet;
\r
60 import com.datastax.driver.core.Row;
\r
61 import com.datastax.driver.core.exceptions.DriverException;
\r
63 public class CacheInfoDAO extends CassDAOImpl<AuthzTrans,CacheInfoDAO.Data> implements CIDAO<AuthzTrans> {
\r
65 private static final String TABLE = "cache";
\r
66 public static final Map<String,Date[]> info = new ConcurrentHashMap<String,Date[]>();
\r
68 private static CacheUpdate cacheUpdate;
\r
71 private BoundStatement check;
\r
72 // Hold current time stamps from Tables
\r
73 private final Date startTime;
\r
75 public CacheInfoDAO(AuthzTrans trans, Cluster cluster, String keyspace) throws APIException, IOException {
\r
76 super(trans, CacheInfoDAO.class.getSimpleName(),cluster,keyspace,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE));
\r
77 startTime = new Date();
\r
81 public CacheInfoDAO(AuthzTrans trans, AbsCassDAO<AuthzTrans,?> aDao) throws APIException, IOException {
\r
82 super(trans, CacheInfoDAO.class.getSimpleName(),aDao,Data.class,TABLE,readConsistency(trans,TABLE), writeConsistency(trans,TABLE));
\r
83 startTime = new Date();
\r
88 //////////////////////////////////////////
\r
89 // Data Definition, matches Cassandra DM
\r
90 //////////////////////////////////////////
\r
91 private static final int KEYLIMIT = 2;
\r
94 public static class Data {
\r
99 public Data(String name, int seg) {
\r
105 public String name;
\r
107 public Date touched;
\r
110 private static class InfoLoader extends Loader<Data> {
\r
111 public static final InfoLoader dflt = new InfoLoader(KEYLIMIT);
\r
113 public InfoLoader(int keylimit) {
\r
118 public Data load(Data data, Row row) {
\r
119 // Int more efficient
\r
120 data.name = row.getString(0);
\r
121 data.seg = row.getInt(1);
\r
122 data.touched = row.getDate(2);
\r
127 protected void key(Data data, int _idx, Object[] obj) {
\r
130 obj[idx]=data.name;
\r
131 obj[++idx]=data.seg;
\r
135 protected void body(Data data, int idx, Object[] obj) {
\r
136 obj[idx]=data.touched;
\r
140 public static<T extends Trans> void startUpdate(AuthzEnv env, HMangr hman, SecuritySetter<HttpURLConnection> ss, String ip, int port) {
\r
141 if(cacheUpdate==null) {
\r
142 Thread t= new Thread(cacheUpdate = new CacheUpdate(env,hman,ss, ip,port),"CacheInfo Update Thread");
\r
148 public static<T extends Trans> void stopUpdate() {
\r
149 if(cacheUpdate!=null) {
\r
150 cacheUpdate.go=false;
\r
154 private final static class CacheUpdate extends Thread {
\r
155 public static BlockingQueue<Transfer> notifyDQ = new LinkedBlockingQueue<Transfer>(2000);
\r
157 private static final String VOID_CT="application/Void+json;q=1.0;charset=utf-8;version=2.0,application/json;q=1.0;version=2.0,*/*;q=1.0";
\r
158 private AuthzEnv env;
\r
159 private HMangr hman;
\r
160 private SecuritySetter<HttpURLConnection> ss;
\r
161 private final String authority;
\r
162 public boolean go = true;
\r
164 public CacheUpdate(AuthzEnv env, HMangr hman, SecuritySetter<HttpURLConnection> ss, String ip, int port) {
\r
169 this.authority = ip+':'+port;
\r
172 private static class Transfer {
\r
173 public String table;
\r
175 public Transfer(String table, int[] segs) {
\r
176 this.table = table;
\r
180 private class CacheClear extends Retryable<Integer> {
\r
181 public int total=0;
\r
182 private AuthzTrans trans;
\r
183 private String type;
\r
184 private String segs;
\r
186 public CacheClear(AuthzTrans trans) {
\r
187 this.trans = trans;
\r
190 public void set(Entry<String, IntHolder> es) {
\r
191 type = es.getKey();
\r
192 segs = es.getValue().toString();
\r
196 public Integer code(Rcli<?> client) throws APIException, CadiException {
\r
197 URI to = client.getURI();
\r
198 if(!to.getAuthority().equals(authority)) {
\r
199 Future<Void> f = client.delete("/mgmt/cache/"+type+'/'+segs,VOID_CT);
\r
200 if(f.get(hman.readTimeout())) {
\r
203 trans.error().log("Error During AAF Peer Notify",f.code(),f.body());
\r
210 private class IntHolder {
\r
212 HashSet<Integer> set;
\r
214 public IntHolder(int ints[]) {
\r
218 public void add(int[] ints) {
\r
220 set = new HashSet<Integer>();
\r
222 for(int i=0;i<raw.length;++i) {
\r
226 for(int i=0;i<ints.length;++i) {
\r
232 public String toString() {
\r
233 StringBuilder sb = new StringBuilder();
\r
234 boolean first = true;
\r
245 for(Integer i : set) {
\r
254 return sb.toString();
\r
259 public void run() {
\r
262 Transfer data = notifyDQ.poll(4,TimeUnit.SECONDS);
\r
268 CacheClear cc = null;
\r
269 Map<String,IntHolder> gather = null;
\r
270 AuthzTrans trans = null;
\r
272 // Do a block poll first
\r
275 start = System.nanoTime();
\r
276 trans = env.newTransNoAvg();
\r
277 cc = new CacheClear(trans);
\r
278 gather = new HashMap<String,IntHolder>();
\r
280 IntHolder prev = gather.get(data.table);
\r
282 gather.put(data.table,new IntHolder(data.segs));
\r
284 prev.add(data.segs);
\r
286 // continue while there is data
\r
287 } while((data = notifyDQ.poll())!=null);
\r
289 for(Entry<String, IntHolder> es : gather.entrySet()) {
\r
292 if(hman.all(ss, cc, false)!=null) {
\r
295 } catch (Exception e) {
\r
296 trans.error().log(e, "Error on Cache Update");
\r
299 if(env.debug().isLoggable()) {
\r
300 float millis = (System.nanoTime()-start)/1000000f;
\r
301 StringBuilder sb = new StringBuilder("Direct Cache Refresh: ");
\r
302 sb.append("Updated ");
\r
305 sb.append(" entry for ");
\r
307 sb.append(" entries for ");
\r
309 int peers = count<=0?0:cc.total/count;
\r
311 sb.append(" client");
\r
318 trans.auditTrail(0, sb, Env.REMOTE);
\r
319 env.debug().log(sb);
\r
322 } catch (InterruptedException e1) {
\r
329 private void init(AuthzTrans trans) throws APIException, IOException {
\r
331 String[] helpers = setCRUD(trans, TABLE, Data.class, InfoLoader.dflt);
\r
332 check = getSession(trans).prepare(SELECT_SP + helpers[FIELD_COMMAS] + " FROM " + TABLE).bind();
\r
334 disable(CRUD.create);
\r
335 disable(CRUD.delete);
\r
339 * @see org.onap.aaf.dao.aaf.cass.CIDAO#touch(org.onap.aaf.authz.env.AuthzTrans, java.lang.String, int)
\r
343 public Result<Void> touch(AuthzTrans trans, String name, int ... seg) {
\r
345 // Direct Service Cache Invalidation
\r
347 // ConcurrentQueues are open-ended. We don't want any Memory leaks
\r
348 // Note: we keep a separate counter, because "size()" on a Linked Queue is expensive
\r
349 if(cacheUpdate!=null) {
\r
351 if(!CacheUpdate.notifyDQ.offer(new CacheUpdate.Transfer(name, seg),2,TimeUnit.SECONDS)) {
\r
352 trans.error().log("Cache Notify Queue is not accepting messages, bouncing may be appropriate" );
\r
354 } catch (InterruptedException e) {
\r
355 trans.error().log("Cache Notify Queue posting was interrupted" );
\r
360 // Table Based Cache Invalidation (original)
\r
362 // Note: Save time with multiple Sequence Touches, but PreparedStmt doesn't support IN
\r
363 StringBuilder start = new StringBuilder("CacheInfoDAO Touch segments ");
\r
364 start.append(name);
\r
365 start.append(": ");
\r
366 StringBuilder sb = new StringBuilder("BEGIN BATCH\n");
\r
367 boolean first = true;
\r
369 sb.append(UPDATE_SP);
\r
371 sb.append(" SET touched=dateof(now()) WHERE name = '");
\r
373 sb.append("' AND seg = ");
\r
383 sb.append("APPLY BATCH;");
\r
384 TimeTaken tt = trans.start(start.toString(),Env.REMOTE);
\r
386 getSession(trans).executeAsync(sb.toString());
\r
387 } catch (DriverException | APIException | IOException e) {
\r
388 reportPerhapsReset(trans,e);
\r
389 return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG);
\r
393 return Result.ok();
\r
397 * @see org.onap.aaf.dao.aaf.cass.CIDAO#check(org.onap.aaf.authz.env.AuthzTrans)
\r
400 public Result<Void> check(AuthzTrans trans) {
\r
402 TimeTaken tt = trans.start("Check Table Timestamps",Env.REMOTE);
\r
404 rs = getSession(trans).execute(check);
\r
405 } catch (DriverException | APIException | IOException e) {
\r
406 reportPerhapsReset(trans,e);
\r
407 return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG);
\r
412 String lastName = null;
\r
413 Date[] dates = null;
\r
414 for(Row row : rs.all()) {
\r
415 String name = row.getString(0);
\r
416 int seg = row.getInt(1);
\r
417 if(!name.equals(lastName)) {
\r
418 dates = info.get(name);
\r
422 dates=new Date[seg+1];
\r
423 info.put(name,dates);
\r
424 } else if(dates.length<=seg) {
\r
425 Date[] temp = new Date[seg+1];
\r
426 System.arraycopy(dates, 0, temp, 0, dates.length);
\r
428 info.put(name, dates);
\r
430 Date temp = row.getDate(2);
\r
431 if(dates[seg]==null || dates[seg].before(temp)) {
\r
435 return Result.ok();
\r
439 * @see org.onap.aaf.dao.aaf.cass.CIDAO#get(java.lang.String, int)
\r
442 public Date get(AuthzTrans trans, String table, int seg) {
\r
443 Date[] dates = info.get(table);
\r
445 dates = new Date[seg+1];
\r
446 touch(trans,table, seg);
\r
447 } else if(dates.length<=seg) {
\r
448 Date[] temp = new Date[seg+1];
\r
449 System.arraycopy(dates, 0, temp, 0, dates.length);
\r
452 Date rv = dates[seg];
\r
454 rv=dates[seg]=startTime;
\r
460 protected void wasModified(AuthzTrans trans, CRUD modified, Data data, String ... override) {
\r