1 /*******************************************************************************
\r
2 * ============LICENSE_START====================================================
\r
4 * * ===========================================================================
\r
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * * ===========================================================================
\r
7 * * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * * you may not use this file except in compliance with the License.
\r
9 * * You may obtain a copy of the License at
\r
11 * * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * * Unless required by applicable law or agreed to in writing, software
\r
14 * * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * * See the License for the specific language governing permissions and
\r
17 * * limitations under the License.
\r
18 * * ============LICENSE_END====================================================
\r
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
\r
22 ******************************************************************************/
\r
23 package com.att.dao;
\r
25 import java.io.ByteArrayInputStream;
\r
26 import java.io.DataInputStream;
\r
27 import java.lang.reflect.Field;
\r
28 import java.nio.ByteBuffer;
\r
29 import java.util.List;
\r
31 import com.att.authz.env.AuthzTrans;
\r
32 import com.att.authz.layer.Result;
\r
33 import com.att.dao.aaf.cass.Status;
\r
34 import com.att.inno.env.TransStore;
\r
35 import com.datastax.driver.core.Cluster;
\r
36 import com.datastax.driver.core.ConsistencyLevel;
\r
37 import com.datastax.driver.core.ResultSet;
\r
38 import com.datastax.driver.core.ResultSetFuture;
\r
43 * Deal with the essentials of Interaction with Cassandra DataStore for all Cassandra DAOs
\r
48 public class CassDAOImpl<TRANS extends TransStore,DATA> extends AbsCassDAO<TRANS, DATA> implements DAO<TRANS,DATA> {
\r
49 public static final String USER_NAME = "__USER_NAME__";
\r
50 protected static final String CREATE_SP = "CREATE ";
\r
51 protected static final String UPDATE_SP = "UPDATE ";
\r
52 protected static final String DELETE_SP = "DELETE ";
\r
53 protected static final String SELECT_SP = "SELECT ";
\r
55 protected final String C_TEXT = getClass().getSimpleName() + " CREATE";
\r
56 protected final String R_TEXT = getClass().getSimpleName() + " READ";
\r
57 protected final String U_TEXT = getClass().getSimpleName() + " UPDATE";
\r
58 protected final String D_TEXT = getClass().getSimpleName() + " DELETE";
\r
59 private String table;
\r
61 protected final ConsistencyLevel readConsistency,writeConsistency;
\r
63 // Setteable only by CachedDAO
\r
64 protected Cached<?, ?> cache;
\r
67 * A Constructor from the originating Cluster. This DAO will open the Session at need,
\r
68 * and shutdown the session when "close()" is called.
\r
74 public CassDAOImpl(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass, String table, ConsistencyLevel read, ConsistencyLevel write) {
\r
75 super(trans, name, cluster,keyspace,dataClass);
\r
77 readConsistency = read;
\r
78 writeConsistency = write;
\r
82 * A Constructor to share Session with other DAOs.
\r
84 * This method get the Session and Cluster information from the calling DAO, and won't
\r
85 * touch the Session on closure.
\r
90 public CassDAOImpl(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass, String table, ConsistencyLevel read, ConsistencyLevel write) {
\r
91 super(trans, name, aDao,dataClass);
\r
93 readConsistency = read;
\r
94 writeConsistency = write;
\r
97 protected PSInfo createPS;
\r
98 protected PSInfo readPS;
\r
99 protected PSInfo updatePS;
\r
100 protected PSInfo deletePS;
\r
101 private boolean async=false;
\r
103 public void async(boolean bool) {
\r
107 public final String[] setCRUD(TRANS trans, String table, Class<?> dc,Loader<DATA> loader) {
\r
108 return setCRUD(trans, table, dc, loader, -1);
\r
111 public final String[] setCRUD(TRANS trans, String table, Class<?> dc,Loader<DATA> loader, int max) {
\r
112 Field[] fields = dc.getDeclaredFields();
\r
113 int end = max>=0 & max<fields.length?max:fields.length;
\r
114 // get keylimit from a non-null Loader
\r
115 int keylimit = loader.keylimit();
\r
117 StringBuilder sbfc = new StringBuilder();
\r
118 StringBuilder sbq = new StringBuilder();
\r
119 StringBuilder sbwc = new StringBuilder();
\r
120 StringBuilder sbup = new StringBuilder();
\r
123 for(int i=0;i<end;++i) {
\r
128 sbwc.append(" AND ");
\r
131 sbfc.append(fields[i].getName());
\r
137 sbup.append(fields[i].getName());
\r
141 sbwc.append(fields[i].getName());
\r
146 createPS = new PSInfo(trans, "INSERT INTO " + table + " ("+ sbfc +") VALUES ("+ sbq +");",loader,writeConsistency);
\r
148 readPS = new PSInfo(trans, "SELECT " + sbfc + " FROM " + table + " WHERE " + sbwc + ';',loader,readConsistency);
\r
150 // Note: UPDATES can't compile if there are no fields besides keys... Use "Insert"
\r
151 if(sbup.length()==0) {
\r
152 updatePS = createPS; // the same as an insert
\r
154 updatePS = new PSInfo(trans, "UPDATE " + table + " SET " + sbup + " WHERE " + sbwc + ';',loader,writeConsistency);
\r
157 deletePS = new PSInfo(trans, "DELETE FROM " + table + " WHERE " + sbwc + ';',loader,writeConsistency);
\r
159 return new String[] {sbfc.toString(), sbq.toString(), sbup.toString(), sbwc.toString()};
\r
162 public void replace(CRUD crud, PSInfo psInfo) {
\r
164 case create: createPS = psInfo; break;
\r
165 case read: readPS = psInfo; break;
\r
166 case update: updatePS = psInfo; break;
\r
167 case delete: deletePS = psInfo; break;
\r
171 public void disable(CRUD crud) {
\r
173 case create: createPS = null; break;
\r
174 case read: readPS = null; break;
\r
175 case update: updatePS = null; break;
\r
176 case delete: deletePS = null; break;
\r
182 * Given a DATA object, extract the individual elements from the Data into an Object Array for the
\r
185 public Result<DATA> create(TRANS trans, DATA data) {
\r
186 if(createPS==null) {
\r
187 Result.err(Result.ERR_NotImplemented,"Create is disabled for %s",getClass().getSimpleName());
\r
189 if(async) /*ResultSetFuture */ {
\r
190 Result<ResultSetFuture> rs = createPS.execAsync(trans, C_TEXT, data);
\r
192 return Result.err(rs);
\r
195 Result<ResultSet> rs = createPS.exec(trans, C_TEXT, data);
\r
197 return Result.err(rs);
\r
200 wasModified(trans, CRUD.create, data);
\r
201 return Result.ok(data);
\r
205 * Read the Unique Row associated with Full Keys
\r
207 public Result<List<DATA>> read(TRANS trans, DATA data) {
\r
209 Result.err(Result.ERR_NotImplemented,"Read is disabled for %s",getClass().getSimpleName());
\r
211 return readPS.read(trans, R_TEXT, data);
\r
214 public Result<List<DATA>> read(TRANS trans, Object ... key) {
\r
216 Result.err(Result.ERR_NotImplemented,"Read is disabled for %s",getClass().getSimpleName());
\r
218 return readPS.read(trans, R_TEXT, key);
\r
221 public Result<Void> update(TRANS trans, DATA data) {
\r
222 if(updatePS==null) {
\r
223 Result.err(Result.ERR_NotImplemented,"Update is disabled for %s",getClass().getSimpleName());
\r
225 if(async)/* ResultSet rs =*/ {
\r
226 Result<ResultSetFuture> rs = updatePS.execAsync(trans, U_TEXT, data);
\r
228 return Result.err(rs);
\r
231 Result<ResultSet> rs = updatePS.exec(trans, U_TEXT, data);
\r
233 return Result.err(rs);
\r
237 wasModified(trans, CRUD.update, data);
\r
238 return Result.ok();
\r
241 // This method Sig for Cached...
\r
242 public Result<Void> delete(TRANS trans, DATA data, boolean reread) {
\r
243 if(deletePS==null) {
\r
244 Result.err(Result.ERR_NotImplemented,"Delete is disabled for %s",getClass().getSimpleName());
\r
246 // Since Deleting will be stored off, for possible re-constitution, need the whole thing
\r
248 Result<List<DATA>> rd = read(trans,data);
\r
250 return Result.err(rd);
\r
253 return Result.err(Status.ERR_NotFound,"Not Found");
\r
255 for(DATA d : rd.value) {
\r
257 Result<ResultSetFuture> rs = deletePS.execAsync(trans, D_TEXT, d);
\r
259 return Result.err(rs);
\r
262 Result<ResultSet> rs = deletePS.exec(trans, D_TEXT, d);
\r
264 return Result.err(rs);
\r
267 wasModified(trans, CRUD.delete, d);
\r
270 if(async)/* ResultSet rs =*/ {
\r
271 Result<ResultSetFuture> rs = deletePS.execAsync(trans, D_TEXT, data);
\r
273 return Result.err(rs);
\r
276 Result<ResultSet> rs = deletePS.exec(trans, D_TEXT, data);
\r
278 return Result.err(rs);
\r
281 wasModified(trans, CRUD.delete, data);
\r
283 return Result.ok();
\r
286 public final Object[] keyFrom(DATA data) {
\r
287 return createPS.keyFrom(data);
\r
291 public String table() {
\r
295 public static final String CASS_READ_CONSISTENCY="cassandra.readConsistency";
\r
296 public static final String CASS_WRITE_CONSISTENCY="cassandra.writeConsistency";
\r
297 protected static ConsistencyLevel readConsistency(AuthzTrans trans, String table) {
\r
298 String prop = trans.getProperty(CASS_READ_CONSISTENCY+'.'+table);
\r
300 prop = trans.getProperty(CASS_READ_CONSISTENCY);
\r
302 return ConsistencyLevel.ONE; // this is Cassandra Default
\r
305 return ConsistencyLevel.valueOf(prop);
\r
308 protected static ConsistencyLevel writeConsistency(AuthzTrans trans, String table) {
\r
309 String prop = trans.getProperty(CASS_WRITE_CONSISTENCY+'.'+table);
\r
311 prop = trans.getProperty(CASS_WRITE_CONSISTENCY);
\r
313 return ConsistencyLevel.ONE; // this is Cassandra Default\
\r
316 return ConsistencyLevel.valueOf(prop);
\r
319 public static DataInputStream toDIS(ByteBuffer bb) {
\r
320 byte[] b = bb.array();
\r
321 return new DataInputStream(
\r
322 new ByteArrayInputStream(b,bb.position(),bb.limit())
\r