+++ /dev/null
-/*******************************************************************************\r
- * ============LICENSE_START====================================================\r
- * * org.onap.aaf\r
- * * ===========================================================================\r
- * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- * * ===========================================================================\r
- * * Licensed under the Apache License, Version 2.0 (the "License");\r
- * * you may not use this file except in compliance with the License.\r
- * * You may obtain a copy of the License at\r
- * * \r
- * * http://www.apache.org/licenses/LICENSE-2.0\r
- * * \r
- * * Unless required by applicable law or agreed to in writing, software\r
- * * distributed under the License is distributed on an "AS IS" BASIS,\r
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * * See the License for the specific language governing permissions and\r
- * * limitations under the License.\r
- * * ============LICENSE_END====================================================\r
- * *\r
- * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
- * *\r
- ******************************************************************************/\r
-package com.att.dao;\r
-\r
-import java.io.ByteArrayInputStream;\r
-import java.io.DataInputStream;\r
-import java.lang.reflect.Field;\r
-import java.nio.ByteBuffer;\r
-import java.util.List;\r
-\r
-import com.att.authz.env.AuthzTrans;\r
-import com.att.authz.layer.Result;\r
-import com.att.dao.aaf.cass.Status;\r
-import com.att.inno.env.TransStore;\r
-import com.datastax.driver.core.Cluster;\r
-import com.datastax.driver.core.ConsistencyLevel;\r
-import com.datastax.driver.core.ResultSet;\r
-import com.datastax.driver.core.ResultSetFuture;\r
-\r
-/**\r
- * AbsCassDAO\r
- *\r
- * Deal with the essentials of Interaction with Cassandra DataStore for all Cassandra DAOs\r
- *\r
- *\r
- * @param <DATA>\r
- */\r
-public class CassDAOImpl<TRANS extends TransStore,DATA> extends AbsCassDAO<TRANS, DATA> implements DAO<TRANS,DATA> {\r
- public static final String USER_NAME = "__USER_NAME__";\r
- protected static final String CREATE_SP = "CREATE ";\r
- protected static final String UPDATE_SP = "UPDATE ";\r
- protected static final String DELETE_SP = "DELETE ";\r
- protected static final String SELECT_SP = "SELECT ";\r
-\r
- protected final String C_TEXT = getClass().getSimpleName() + " CREATE";\r
- protected final String R_TEXT = getClass().getSimpleName() + " READ";\r
- protected final String U_TEXT = getClass().getSimpleName() + " UPDATE";\r
- protected final String D_TEXT = getClass().getSimpleName() + " DELETE";\r
- private String table;\r
- \r
- protected final ConsistencyLevel readConsistency,writeConsistency;\r
- \r
- // Setteable only by CachedDAO\r
- protected Cached<?, ?> cache;\r
-\r
- /**\r
- * A Constructor from the originating Cluster. This DAO will open the Session at need,\r
- * and shutdown the session when "close()" is called.\r
- *\r
- * @param cluster\r
- * @param keyspace\r
- * @param dataClass\r
- */\r
- public CassDAOImpl(TRANS trans, String name, Cluster cluster, String keyspace, Class<DATA> dataClass, String table, ConsistencyLevel read, ConsistencyLevel write) {\r
- super(trans, name, cluster,keyspace,dataClass);\r
- this.table = table;\r
- readConsistency = read;\r
- writeConsistency = write;\r
- }\r
- \r
- /**\r
- * A Constructor to share Session with other DAOs.\r
- *\r
- * This method get the Session and Cluster information from the calling DAO, and won't\r
- * touch the Session on closure.\r
- *\r
- * @param aDao\r
- * @param dataClass\r
- */\r
- public CassDAOImpl(TRANS trans, String name, AbsCassDAO<TRANS,?> aDao, Class<DATA> dataClass, String table, ConsistencyLevel read, ConsistencyLevel write) {\r
- super(trans, name, aDao,dataClass);\r
- this.table = table;\r
- readConsistency = read;\r
- writeConsistency = write;\r
- }\r
-\r
- protected PSInfo createPS;\r
- protected PSInfo readPS;\r
- protected PSInfo updatePS;\r
- protected PSInfo deletePS;\r
- private boolean async=false;\r
-\r
- public void async(boolean bool) {\r
- async = bool;\r
- }\r
-\r
- public final String[] setCRUD(TRANS trans, String table, Class<?> dc,Loader<DATA> loader) {\r
- return setCRUD(trans, table, dc, loader, -1);\r
- }\r
- \r
- public final String[] setCRUD(TRANS trans, String table, Class<?> dc,Loader<DATA> loader, int max) {\r
- Field[] fields = dc.getDeclaredFields();\r
- int end = max>=0 & max<fields.length?max:fields.length;\r
- // get keylimit from a non-null Loader\r
- int keylimit = loader.keylimit();\r
- \r
- StringBuilder sbfc = new StringBuilder();\r
- StringBuilder sbq = new StringBuilder();\r
- StringBuilder sbwc = new StringBuilder();\r
- StringBuilder sbup = new StringBuilder();\r
- \r
- if(keylimit>0) {\r
- for(int i=0;i<end;++i) {\r
- if(i>0) {\r
- sbfc.append(',');\r
- sbq.append(',');\r
- if(i<keylimit) {\r
- sbwc.append(" AND ");\r
- }\r
- }\r
- sbfc.append(fields[i].getName());\r
- sbq.append('?');\r
- if(i>=keylimit) {\r
- if(i>keylimit) {\r
- sbup.append(',');\r
- }\r
- sbup.append(fields[i].getName());\r
- sbup.append("=?");\r
- }\r
- if(i<keylimit) {\r
- sbwc.append(fields[i].getName());\r
- sbwc.append("=?");\r
- }\r
- }\r
- \r
- createPS = new PSInfo(trans, "INSERT INTO " + table + " ("+ sbfc +") VALUES ("+ sbq +");",loader,writeConsistency);\r
- \r
- readPS = new PSInfo(trans, "SELECT " + sbfc + " FROM " + table + " WHERE " + sbwc + ';',loader,readConsistency);\r
- \r
- // Note: UPDATES can't compile if there are no fields besides keys... Use "Insert"\r
- if(sbup.length()==0) {\r
- updatePS = createPS; // the same as an insert\r
- } else {\r
- updatePS = new PSInfo(trans, "UPDATE " + table + " SET " + sbup + " WHERE " + sbwc + ';',loader,writeConsistency);\r
- }\r
- \r
- deletePS = new PSInfo(trans, "DELETE FROM " + table + " WHERE " + sbwc + ';',loader,writeConsistency);\r
- }\r
- return new String[] {sbfc.toString(), sbq.toString(), sbup.toString(), sbwc.toString()};\r
- }\r
-\r
- public void replace(CRUD crud, PSInfo psInfo) {\r
- switch(crud) {\r
- case create: createPS = psInfo; break;\r
- case read: readPS = psInfo; break;\r
- case update: updatePS = psInfo; break;\r
- case delete: deletePS = psInfo; break;\r
- }\r
- }\r
-\r
- public void disable(CRUD crud) {\r
- switch(crud) {\r
- case create: createPS = null; break;\r
- case read: readPS = null; break;\r
- case update: updatePS = null; break;\r
- case delete: deletePS = null; break;\r
- }\r
- }\r
-\r
- \r
- /**\r
- * Given a DATA object, extract the individual elements from the Data into an Object Array for the\r
- * execute element.\r
- */\r
- public Result<DATA> create(TRANS trans, DATA data) {\r
- if(createPS==null) {\r
- Result.err(Result.ERR_NotImplemented,"Create is disabled for %s",getClass().getSimpleName());\r
- }\r
- if(async) /*ResultSetFuture */ {\r
- Result<ResultSetFuture> rs = createPS.execAsync(trans, C_TEXT, data);\r
- if(rs.notOK()) {\r
- return Result.err(rs);\r
- }\r
- } else {\r
- Result<ResultSet> rs = createPS.exec(trans, C_TEXT, data);\r
- if(rs.notOK()) {\r
- return Result.err(rs);\r
- }\r
- }\r
- wasModified(trans, CRUD.create, data);\r
- return Result.ok(data);\r
- }\r
-\r
- /**\r
- * Read the Unique Row associated with Full Keys\r
- */\r
- public Result<List<DATA>> read(TRANS trans, DATA data) {\r
- if(readPS==null) {\r
- Result.err(Result.ERR_NotImplemented,"Read is disabled for %s",getClass().getSimpleName());\r
- }\r
- return readPS.read(trans, R_TEXT, data);\r
- }\r
-\r
- public Result<List<DATA>> read(TRANS trans, Object ... key) {\r
- if(readPS==null) {\r
- Result.err(Result.ERR_NotImplemented,"Read is disabled for %s",getClass().getSimpleName());\r
- }\r
- return readPS.read(trans, R_TEXT, key);\r
- }\r
-\r
- public Result<Void> update(TRANS trans, DATA data) {\r
- if(updatePS==null) {\r
- Result.err(Result.ERR_NotImplemented,"Update is disabled for %s",getClass().getSimpleName());\r
- }\r
- if(async)/* ResultSet rs =*/ {\r
- Result<ResultSetFuture> rs = updatePS.execAsync(trans, U_TEXT, data);\r
- if(rs.notOK()) {\r
- return Result.err(rs);\r
- }\r
- } else {\r
- Result<ResultSet> rs = updatePS.exec(trans, U_TEXT, data);\r
- if(rs.notOK()) {\r
- return Result.err(rs);\r
- }\r
- }\r
- \r
- wasModified(trans, CRUD.update, data);\r
- return Result.ok();\r
- }\r
-\r
- // This method Sig for Cached...\r
- public Result<Void> delete(TRANS trans, DATA data, boolean reread) {\r
- if(deletePS==null) {\r
- Result.err(Result.ERR_NotImplemented,"Delete is disabled for %s",getClass().getSimpleName());\r
- }\r
- // Since Deleting will be stored off, for possible re-constitution, need the whole thing\r
- if(reread) {\r
- Result<List<DATA>> rd = read(trans,data);\r
- if(rd.notOK()) {\r
- return Result.err(rd);\r
- }\r
- if(rd.isEmpty()) {\r
- return Result.err(Status.ERR_NotFound,"Not Found");\r
- }\r
- for(DATA d : rd.value) { \r
- if(async) {\r
- Result<ResultSetFuture> rs = deletePS.execAsync(trans, D_TEXT, d);\r
- if(rs.notOK()) {\r
- return Result.err(rs);\r
- }\r
- } else {\r
- Result<ResultSet> rs = deletePS.exec(trans, D_TEXT, d);\r
- if(rs.notOK()) {\r
- return Result.err(rs);\r
- }\r
- }\r
- wasModified(trans, CRUD.delete, d);\r
- }\r
- } else {\r
- if(async)/* ResultSet rs =*/ {\r
- Result<ResultSetFuture> rs = deletePS.execAsync(trans, D_TEXT, data);\r
- if(rs.notOK()) {\r
- return Result.err(rs);\r
- }\r
- } else {\r
- Result<ResultSet> rs = deletePS.exec(trans, D_TEXT, data);\r
- if(rs.notOK()) {\r
- return Result.err(rs);\r
- }\r
- }\r
- wasModified(trans, CRUD.delete, data);\r
- }\r
- return Result.ok();\r
- }\r
- \r
- public final Object[] keyFrom(DATA data) {\r
- return createPS.keyFrom(data);\r
- }\r
-\r
- @Override\r
- public String table() {\r
- return table;\r
- }\r
- \r
- public static final String CASS_READ_CONSISTENCY="cassandra.readConsistency";\r
- public static final String CASS_WRITE_CONSISTENCY="cassandra.writeConsistency";\r
- protected static ConsistencyLevel readConsistency(AuthzTrans trans, String table) {\r
- String prop = trans.getProperty(CASS_READ_CONSISTENCY+'.'+table);\r
- if(prop==null) {\r
- prop = trans.getProperty(CASS_READ_CONSISTENCY);\r
- if(prop==null) {\r
- return ConsistencyLevel.ONE; // this is Cassandra Default\r
- }\r
- }\r
- return ConsistencyLevel.valueOf(prop);\r
- }\r
-\r
- protected static ConsistencyLevel writeConsistency(AuthzTrans trans, String table) {\r
- String prop = trans.getProperty(CASS_WRITE_CONSISTENCY+'.'+table);\r
- if(prop==null) {\r
- prop = trans.getProperty(CASS_WRITE_CONSISTENCY);\r
- if(prop==null) {\r
- return ConsistencyLevel.ONE; // this is Cassandra Default\\r
- }\r
- }\r
- return ConsistencyLevel.valueOf(prop);\r
- }\r
-\r
- public static DataInputStream toDIS(ByteBuffer bb) {\r
- byte[] b = bb.array();\r
- return new DataInputStream(\r
- new ByteArrayInputStream(b,bb.position(),bb.limit())\r
- );\r
- }\r
-\r
-\r
-}\r