import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.UUID;
import com.datastax.driver.core.*;
import org.onap.music.eelf.logging.EELFLoggerDelegate;
import com.datastax.driver.core.exceptions.AlreadyExistsException;
import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import com.datastax.driver.core.utils.UUIDs;
-import com.sun.jersey.core.util.Base64;
/**
* @author nelson24
*/
public class CassaDataStore {
+ public static final String CONSISTENCY_LEVEL_ONE = "ONE";
+ public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
+
private Session session;
private Cluster cluster;
-
-
/**
* @param session
*/
}
/**
- * @param session
+ * @param
*/
public Session getSession() {
return session;
return resultMap;
}
+ /**
+ * This Method performs DDL and DML operations on Cassandra using specified consistency level outside any time-slot
+ *
+ * @param queryObject Object containing cassandra prepared query and values.
+ * @param consistency Specify consistency level for data synchronization across cassandra
+ * replicas
+ * @return Boolean Indicates operation success or failure
+ * @throws MusicServiceException
+ * @throws MusicQueryException
+ */
+ public boolean executePut(PreparedQueryObject queryObject, String consistency)
+ throws MusicServiceException, MusicQueryException {
+ return executePut(queryObject, consistency, 0);
+ }
// Prepared Statements 1802 additions
/**
* This Method performs DDL and DML operations on Cassandra using specified consistency level
*
* @param queryObject Object containing cassandra prepared query and values.
- * @param consistency Specify consistency level for data synchronization across cassandra
+ * @param consistencyLevel Specify consistency level for data synchronization across cassandra
* replicas
+ * @param timeSlot Specify timestamp time-slot
* @return Boolean Indicates operation success or failure
* @throws MusicServiceException
* @throws MusicQueryException
*/
- public boolean executePut(PreparedQueryObject queryObject, String consistency)
- throws MusicServiceException, MusicQueryException {
+ public boolean executePut(PreparedQueryObject queryObject, String consistencyLevel, long timeSlot)
+ throws MusicServiceException, MusicQueryException {
boolean result = false;
+ long timeOfWrite = System.currentTimeMillis();
if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
"In preprared Execute Put: the actual insert query:"
+ queryObject.getQuery() + "; the values"
+ queryObject.getValues());
- PreparedStatement preparedInsert = null;
+ SimpleStatement statement;
try {
-
- preparedInsert = session.prepare(queryObject.getQuery());
+
+ statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
} catch(InvalidQueryException iqe) {
logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
throw new MusicQueryException(iqe.getMessage());
}
try {
- if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
+ if (consistencyLevel.equalsIgnoreCase(MusicUtil.CRITICAL)) {
logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
- preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
- } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
+ statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
+ } else if (consistencyLevel.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
- preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
+ statement.setConsistencyLevel(ConsistencyLevel.ONE);
}
- BoundStatement boundStatement = preparedInsert.bind(queryObject.getValues().toArray());
- boundStatement.setDefaultTimestamp(MusicUtil.v2sTimeStampInMicroseconds(0, System.currentTimeMillis()));
+ long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite);
+ statement.setDefaultTimestamp(timestamp);
- ResultSet rs = session.execute(boundStatement);
+ ResultSet rs = session.execute(statement);
result = rs.wasApplied();
}
catch (AlreadyExistsException ae) {
}
/**
- * This method performs DDL operations on Cassandra using consistency level ONE.
- *
+ * This method performs DDL operations on Cassandra using consistency specified consistency.
+ *
* @param queryObject Object containing cassandra prepared query and values.
- * @return ResultSet
- * @throws MusicServiceException
- * @throws MusicQueryException
*/
- public ResultSet executeEventualGet(PreparedQueryObject queryObject)
- throws MusicServiceException, MusicQueryException {
+ public ResultSet executeGet(PreparedQueryObject queryObject, String consistencyLevel)
+ throws MusicServiceException, MusicQueryException {
if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
- logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
- throw new MusicQueryException("Ill formed queryObject for the request = " + "["
- + queryObject.getQuery() + "]");
+ logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
+ throw new MusicQueryException("Ill formed queryObject for the request = " + "["
+ + queryObject.getQuery() + "]");
}
logger.info(EELFLoggerDelegate.applicationLogger,
- "Executing Eventual get query:" + queryObject.getQuery());
-
+ "Executing Eventual get query:" + queryObject.getQuery());
+
ResultSet results = null;
try {
- PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
- preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
- results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
+ SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
+
+ if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) {
+ statement.setConsistencyLevel(ConsistencyLevel.ONE);
+ }
+ else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
+ statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
+ }
+
+ results = session.execute(statement);
} catch (Exception ex) {
- logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
- throw new MusicServiceException(ex.getMessage());
+ logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
+ throw new MusicServiceException(ex.getMessage());
}
return results;
}
+ /**
+ * This method performs DDL operations on Cassandra using consistency level ONE.
+ *
+ * @param queryObject Object containing cassandra prepared query and values.
+ */
+ public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
+ throws MusicServiceException, MusicQueryException {
+ return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
+ }
+
/**
*
* This method performs DDL operation on Cassandra using consistency level QUORUM.
*
* @param queryObject Object containing cassandra prepared query and values.
- * @return ResultSet
- * @throws MusicServiceException
- * @throws MusicQueryException
*/
- public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
+ public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
throws MusicServiceException, MusicQueryException {
- if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
- logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
- throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
- + queryObject.getQuery() + "]");
- }
- logger.info(EELFLoggerDelegate.applicationLogger,
- "Executing Critical get query:" + queryObject.getQuery());
- PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
- preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
- ResultSet results = null;
- try {
- results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
- } catch (Exception ex) {
- logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
- throw new MusicServiceException(ex.getMessage());
- }
- return results;
-
+ return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);
}
}
String query = "select * from "+syncTable+" where key='"+fullyQualifiedKey+"';";
PreparedQueryObject readQueryObject = new PreparedQueryObject();
readQueryObject.appendQueryString(query);
- ResultSet results = getDSHandle().executeCriticalGet(readQueryObject);
+ ResultSet results = getDSHandle().executeQuorumConsistencyGet(readQueryObject);
if (results.all().size() != 0) {
logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
try {
selectQuery.addValue(cqlFormattedPrimaryKeyValue);
ResultSet results = null;
try {
- results = getDSHandle().executeCriticalGet(selectQuery);
+ results = getDSHandle().executeQuorumConsistencyGet(selectQuery);
// write it back to a quorum
Row row = results.one();
ColumnDefinitions colInfo = row.getColumnDefinitions();
public static ResultSet quorumGet(PreparedQueryObject query) {
ResultSet results = null;
try {
- results = getDSHandle().executeCriticalGet(query);
+ results = getDSHandle().executeQuorumConsistencyGet(query);
} catch (MusicServiceException | MusicQueryException e) {
logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR);
+ e.getMessage());
}
- String query = queryObject.getQuery();
- long timeOfWrite = System.currentTimeMillis();
long lockOrdinal = Long.parseLong(lockReference);
- long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
- // TODO: use Statement instead of modifying query
- query = query.replaceFirst("SET", "USING TIMESTAMP "+ ts + " SET");
- queryObject.replaceQueryString(query);
CassaDataStore dsHandle = getDSHandle();
- dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
+ dsHandle.executePut(queryObject, MusicUtil.CRITICAL, lockOrdinal);
long end = System.currentTimeMillis();
logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
}catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
public static ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
ResultSet results = null;
try {
- results = getDSHandle().executeEventualGet(queryObject);
+ results = getDSHandle().executeOneConsistencyGet(queryObject);
} catch (MusicQueryException | MusicServiceException e) {
logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
throw new MusicServiceException(e.getMessage());
ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference);
if(result.getResult().equals(ResultType.FAILURE))
return null;//not top of the lock store q
- results = getDSHandle().executeCriticalGet(queryObject);
+ results = getDSHandle().executeQuorumConsistencyGet(queryObject);
} catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
}