public static final String CONSISTENCY_LEVEL_ONE = "ONE";
public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
+ public static final String CONSISTENCY_LEVEL_SERIAL = "SERIAL";
private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
}
+ else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_SERIAL)) {
+ statement.setConsistencyLevel(ConsistencyLevel.SERIAL);
+ }
results = session.execute(statement);
/**
* This method performs DDL operations on Cassandra using consistency level ONE.
- *
+ *
* @param queryObject Object containing cassandra prepared query and values.
*/
public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
- throws MusicServiceException, MusicQueryException {
+ throws MusicServiceException, MusicQueryException {
TimeMeasureInstance.instance().enter("executeOneConsistencyGet");
try {
return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
}
}
+ /**
+ * This method performs DDL operations on Cassandra using consistency level ONE.
+ *
+ * @param queryObject Object containing cassandra prepared query and values.
+ */
+ public ResultSet executeSerialConsistencyGet(PreparedQueryObject queryObject)
+ throws MusicServiceException, MusicQueryException {
+ TimeMeasureInstance.instance().enter("executeOneConsistencyGet");
+ try {
+ return executeGet(queryObject, CONSISTENCY_LEVEL_SERIAL);
+ }
+ finally {
+ TimeMeasureInstance.instance().exit();
+ }
+ }
+
/**
*
* This method performs DDL operation on Cassandra using consistency level QUORUM.
String primaryKeyValue = splitString[2];
LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
-
- /* Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the
- * reference*/
+
+ // No information about lock holder
+ if (currentLockHolderObject == null)
+ return;
+
+ // Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the
+ // reference
long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.acquireTime), Long.parseLong(currentLockHolderObject.createTime));
if((System.currentTimeMillis() - referenceTime) > leasePeriod) {
TimeMeasureInstance.instance().enter("isTopOfLockStore");
try {
// return failure to lock holders too early or already evicted from the lock store
- String topOfLockStoreS =
- getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
+ LockObject lockObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
+
+ // TODO: differentiate between "not yet" and "no longer" in return value, so that the caller knows
+ // whether it should retry or not
+ if (lockObject == null) {
+ logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not in local lock queue yet");
+ return new ReturnType(ResultType.FAILURE, lockReference+" is not in local lock queue yet");
+ }
+
+ String topOfLockStoreS = lockObject.lockRef;
long topOfLockStoreL = Long.parseLong(topOfLockStoreS);
long lockReferenceL = Long.parseLong(lockReference);
+ // TODO: differentiate between "not yet" and "no longer" in return value, so that the caller knows
+ // whether it should retry or not
if (lockReferenceL > topOfLockStoreL) {
- // only need to check if this is a read lock....
- if (getLockingServiceHandle().isTopOfLockQueue(keyspace, table, primaryKeyValue,
- lockReference)) {
- return new ReturnType(ResultType.SUCCESS, lockReference + " can read the values");
- }
+ // only need to check if this is a read lock....
+ if (getLockingServiceHandle().isTopOfLockQueue(keyspace, table, primaryKeyValue,
+ lockReference)) {
+ return new ReturnType(ResultType.SUCCESS, lockReference + " can read the values");
+ }
logger.info(EELFLoggerDelegate.applicationLogger,
lockReference + " is not the lock holder yet");
return new ReturnType(ResultType.FAILURE,
lockReference + " is not the lock holder yet");
}
-
if (lockReferenceL < topOfLockStoreL) {
logger.info(EELFLoggerDelegate.applicationLogger,
lockReference + " is no longer/or was never in the lock store queue");
ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference);
if (result.getResult().equals(ResultType.FAILURE))
- return result;//not top of the lock store q
+ return result; // not top of the lock store q
- //check to see if the value of the key has to be synced in case there was a forceful release
+ // check to see if the value of the key has to be synced in case there was a forceful release
String syncTable = keyspace + ".unsyncedKeys_" + table;
String query = "select * from " + syncTable + " where key='" + fullyQualifiedKey + "';";
PreparedQueryObject readQueryObject = new PreparedQueryObject();
* @param fullyQualifiedKey lockName
* @return
*/
- public String whoseTurnIsIt(String fullyQualifiedKey) {
+ public String whoseTurnIsIt(String fullyQualifiedKey) {
String[] splitString = fullyQualifiedKey.split("\\.");
String keyspace = splitString[0];
String table = splitString[1];
String primaryKeyValue = splitString[2];
try {
- return getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
+ LockObject lockObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
+ return lockObject != null ? lockObject.lockRef : null;
} catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
}
try {
ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference);
if (result.getResult().equals(ResultType.FAILURE))
- return result;//not top of the lock store q
+ return result; // not top of the lock store q
if (conditionInfo != null)
try {
try {
ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference);
if(result.getResult().equals(ResultType.FAILURE))
- return null;//not top of the lock store q
+ return null; // not top of the lock store q
results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
} catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
String lockReference = createLockReference(fullyQualifiedKey);
long lockCreationTime = System.currentTimeMillis();
- ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockReference);
+ ReturnType lockAcqResult;
+
+ int tries = 0;
+ do {
+ lockAcqResult = acquireLock(fullyQualifiedKey, lockReference);
+ tries++;
+ } while (!lockAcqResult.getResult().equals(ResultType.SUCCESS));
+
long lockAcqTime = System.currentTimeMillis();
- if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
- logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockReference);
- voluntaryReleaseLock(fullyQualifiedKey, lockReference);
- return lockAcqResult;
- }
+// if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+// logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockReference);
+// voluntaryReleaseLock(fullyQualifiedKey, lockReference);
+// return lockAcqResult;
+// }
- logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockReference);
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "acquired lock with id " + lockReference + " after " + tries + "tries");
ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
queryObject, lockReference, conditionInfo);
long criticalPutTime = System.currentTimeMillis();