2 * ============LICENSE_START====================================================
4 * ===========================================================================
5 * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
6 * ===========================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END====================================================
22 package org.onap.aaf.auth.dao;
24 import java.io.ByteArrayInputStream;
25 import java.io.DataInputStream;
26 import java.lang.reflect.Field;
27 import java.nio.ByteBuffer;
28 import java.util.List;
30 import org.onap.aaf.auth.dao.cass.Status;
31 import org.onap.aaf.auth.env.AuthzTrans;
32 import org.onap.aaf.auth.layer.Result;
33 import org.onap.aaf.misc.env.TransStore;
35 import com.datastax.driver.core.Cluster;
36 import com.datastax.driver.core.ConsistencyLevel;
37 import com.datastax.driver.core.ResultSet;
38 import com.datastax.driver.core.ResultSetFuture;
43 * Deal with the essentials of Interaction with Cassandra DataStore for all Cassandra DAOs
49 public class CassDAOImpl<TRANS extends TransStore,DATA> extends AbsCassDAO<TRANS, DATA> implements DAO<TRANS,DATA> {
50 public static final String USER_NAME = "__USER_NAME__";
51 public static final String CASS_READ_CONSISTENCY="cassandra.readConsistency";
52 public static final String CASS_WRITE_CONSISTENCY="cassandra.writeConsistency";
53 protected static final String CREATE_SP = "CREATE ";
54 protected static final String UPDATE_SP = "UPDATE ";
55 protected static final String DELETE_SP = "DELETE ";
56 protected static final String SELECT_SP = "SELECT ";
57 private static final String WHERE = " WHERE ";
58 private static final String READ_IS_DISABLED = "Read is disabled for %s";
60 protected final String C_TEXT = getClass().getSimpleName() + " CREATE";
61 protected final String R_TEXT = getClass().getSimpleName() + " READ";
62 protected final String U_TEXT = getClass().getSimpleName() + " UPDATE";
63 protected final String D_TEXT = getClass().getSimpleName() + " DELETE";
66 protected final ConsistencyLevel readConsistency;
67 protected final ConsistencyLevel writeConsistency;
69 protected PSInfo createPS;
70 protected PSInfo readPS;
71 protected PSInfo updatePS;
72 protected PSInfo deletePS;
73 protected boolean async=false;
75 // Setteable only by CachedDAO
76 protected Cached<?, ?> cache;
79 * A Constructor from the originating Cluster. This DAO will open the Session at need,
80 * and shutdown the session when "close()" is called.
86 public CassDAOImpl(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass, String table, ConsistencyLevel read, ConsistencyLevel write) {
87 super(trans, name, cluster,keyspace,dataClass);
89 readConsistency = read;
90 writeConsistency = write;
94 * A Constructor to share Session with other DAOs.
96 * This method get the Session and Cluster information from the calling DAO, and won't
97 * touch the Session on closure.
102 public CassDAOImpl(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass, String table, ConsistencyLevel read, ConsistencyLevel write) {
103 super(trans, name, aDao,dataClass);
105 readConsistency = read;
106 writeConsistency = write;
109 public void async(boolean bool) {
113 public final String[] setCRUD(TRANS trans, String table, Class<?> dc,Loader<DATA> loader) {
114 return setCRUD(trans, table, dc, loader, -1);
117 public final String[] setCRUD(TRANS trans, String table, Class<?> dc,Loader<DATA> loader, int max) {
118 Field[] fields = dc.getDeclaredFields();
119 int end = max>=0 && max<fields.length?max:fields.length;
120 // get keylimit from a non-null Loader
121 int keylimit = loader.keylimit();
123 StringBuilder sbfc = new StringBuilder();
124 StringBuilder sbq = new StringBuilder();
125 StringBuilder sbwc = new StringBuilder();
126 StringBuilder sbup = new StringBuilder();
129 for (int i=0;i<end;++i) {
134 sbwc.append(" AND ");
137 sbfc.append(fields[i].getName());
143 sbup.append(fields[i].getName());
147 sbwc.append(fields[i].getName());
152 createPS = new PSInfo(trans, "INSERT INTO " + table + " ("+ sbfc +") VALUES ("+ sbq +");",loader,writeConsistency);
154 readPS = new PSInfo(trans, SELECT_SP + sbfc + " FROM " + table + WHERE + sbwc + ';',loader,readConsistency);
156 // Note: UPDATES can't compile if there are no fields besides keys... Use "Insert"
157 if (sbup.length()==0) {
158 updatePS = createPS; // the same as an insert
160 updatePS = new PSInfo(trans, UPDATE_SP + table + " SET " + sbup + WHERE + sbwc + ';',loader,writeConsistency);
163 deletePS = new PSInfo(trans, "DELETE FROM " + table + WHERE + sbwc + ';',loader,writeConsistency);
165 return new String[] {sbfc.toString(), sbq.toString(), sbup.toString(), sbwc.toString()};
168 public void replace(CRUD crud, PSInfo psInfo) {
170 case create: createPS = psInfo; break;
171 case read: readPS = psInfo; break;
172 case update: updatePS = psInfo; break;
173 case delete: deletePS = psInfo; break;
177 public void disable(CRUD crud) {
179 case create: createPS = null; break;
180 case read: readPS = null; break;
181 case update: updatePS = null; break;
182 case delete: deletePS = null; break;
188 * Given a DATA object, extract the individual elements from the Data into an Object Array for the
191 public Result<DATA> create(TRANS trans, DATA data) {
192 if (createPS==null) {
193 return Result.err(Result.ERR_NotImplemented,"Create is disabled for %s",getClass().getSimpleName());
195 if (async) /*ResultSetFuture */ {
196 Result<ResultSetFuture> rs = createPS.execAsync(trans, C_TEXT, data);
198 return Result.err(rs);
201 Result<ResultSet> rs = createPS.exec(trans, C_TEXT, data);
203 return Result.err(rs);
206 wasModified(trans, CRUD.create, data);
207 return Result.ok(data);
211 * Read the Unique Row associated with Full Keys
213 public Result<List<DATA>> read(TRANS trans, DATA data) {
215 return Result.err(Result.ERR_NotImplemented,READ_IS_DISABLED,getClass().getSimpleName());
217 return readPS.read(trans, R_TEXT, data);
220 public Result<List<DATA>> read(TRANS trans, Object ... key) {
222 return Result.err(Result.ERR_NotImplemented,READ_IS_DISABLED,getClass().getSimpleName());
224 return readPS.read(trans, R_TEXT, key);
227 public Result<DATA> readPrimKey(TRANS trans, Object ... key) {
229 return Result.err(Result.ERR_NotImplemented,READ_IS_DISABLED,getClass().getSimpleName());
231 Result<List<DATA>> rld = readPS.read(trans, R_TEXT, key);
234 return Result.err(Result.ERR_NotFound,rld.details);
236 return Result.ok(rld.value.get(0));
239 return Result.err(rld);
243 public Result<Void> update(TRANS trans, DATA data) {
244 return update(trans, data, async);
247 public Result<Void> update(TRANS trans, DATA data, boolean async) {
248 if (updatePS==null) {
249 return Result.err(Result.ERR_NotImplemented,"Update is disabled for %s",getClass().getSimpleName());
251 if (async)/* ResultSet rs =*/ {
252 Result<ResultSetFuture> rs = updatePS.execAsync(trans, U_TEXT, data);
254 return Result.err(rs);
257 Result<ResultSet> rs = updatePS.exec(trans, U_TEXT, data);
259 return Result.err(rs);
263 wasModified(trans, CRUD.update, data);
267 // This method Sig for Cached...
268 public Result<Void> delete(TRANS trans, DATA data, boolean reread) {
269 if (deletePS==null) {
270 return Result.err(Result.ERR_NotImplemented,"Delete is disabled for %s",getClass().getSimpleName());
272 // Since Deleting will be stored off, for possible re-constitution, need the whole thing
274 Result<List<DATA>> rd = read(trans,data);
276 return Result.err(rd);
279 return Result.err(Status.ERR_NotFound,"Not Found");
281 for (DATA d : rd.value) {
283 Result<ResultSetFuture> rs = deletePS.execAsync(trans, D_TEXT, d);
285 return Result.err(rs);
288 Result<ResultSet> rs = deletePS.exec(trans, D_TEXT, d);
290 return Result.err(rs);
293 wasModified(trans, CRUD.delete, d);
296 if (async)/* ResultSet rs =*/ {
297 Result<ResultSetFuture> rs = deletePS.execAsync(trans, D_TEXT, data);
299 return Result.err(rs);
302 Result<ResultSet> rs = deletePS.exec(trans, D_TEXT, data);
304 return Result.err(rs);
307 wasModified(trans, CRUD.delete, data);
312 public final Object[] keyFrom(DATA data) {
313 return createPS.keyFrom(data);
317 public String table() {
321 protected static ConsistencyLevel readConsistency(AuthzTrans trans, String table) {
322 String prop = trans.getProperty(CASS_READ_CONSISTENCY+'.'+table);
324 prop = trans.getProperty(CASS_READ_CONSISTENCY);
326 return ConsistencyLevel.ONE; // this is Cassandra Default
329 return ConsistencyLevel.valueOf(prop);
332 protected static ConsistencyLevel writeConsistency(AuthzTrans trans, String table) {
333 String prop = trans.getProperty(CASS_WRITE_CONSISTENCY+'.'+table);
335 prop = trans.getProperty(CASS_WRITE_CONSISTENCY);
337 return ConsistencyLevel.ONE; // this is Cassandra Default\
340 return ConsistencyLevel.valueOf(prop);
343 public static DataInputStream toDIS(ByteBuffer bb) {
344 byte[] b = bb.array();
345 return new DataInputStream(
346 new ByteArrayInputStream(b,bb.position(),bb.limit())