import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
+import java.util.concurrent.TimeUnit;
import org.onap.music.datastore.MusicDataStore;
import org.onap.music.datastore.MusicDataStoreHandle;
import org.onap.music.main.ReturnType;
import org.onap.music.service.MusicCoreService;
+import com.att.eelf.configuration.EELFLogger;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
- public String createLockReference(String fullyQualifiedKey) {
+ public String createLockReference(String fullyQualifiedKey) throws MusicLockingException {
String[] splitString = fullyQualifiedKey.split("\\.");
String keyspace = splitString[0];
String table = splitString[1];
logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
long start = System.currentTimeMillis();
String lockReference = null;
+
try {
lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName);
} catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
e.printStackTrace();
+ throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
+ } catch (Exception e) {
logger.error(EELFLoggerDelegate.applicationLogger, e);
+ throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
}
long end = System.currentTimeMillis();
logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
- evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
+ evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
return acquireLock(fullyQualifiedKey, lockReference);
}
String primaryKeyValue = splitString[2];
LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
-
- /* Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the
+
+ if (currentLockHolderObject==null) { //no lock holder
+ return;
+ }
+ /* Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the
* reference*/
-
long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.acquireTime), Long.parseLong(currentLockHolderObject.createTime));
if((System.currentTimeMillis() - referenceTime) > leasePeriod) {
forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.lockRef+"");
logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.lockRef+" forcibly released");
- }
+ }
}
-
+
private static ReturnType isTopOfLockStore(String keyspace, String table, String primaryKeyValue, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException {
-
//return failure to lock holders too early or already evicted from the lock store
- String topOfLockStoreS = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
+ LockObject topOfLockStore = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
+ if (topOfLockStore==null) {
+ logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not the lock holder yet");
+ return new ReturnType(ResultType.FAILURE, "No lock holder!");
+ }
+
+ String topOfLockStoreS = topOfLockStore.lockRef;
long topOfLockStoreL = Long.parseLong(topOfLockStoreS);
long lockReferenceL = Long.parseLong(lockReference);
return new ReturnType(ResultType.FAILURE, lockReference+" is not the lock holder yet");
}
-
if(lockReferenceL < topOfLockStoreL) {
logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is no longer/or was never in the lock store queue");
return new ReturnType(ResultType.FAILURE, lockReference+" is no longer/or was never in the lock store queue");
- }
+ }
- return new ReturnType(ResultType.SUCCESS, lockReference+" is top of lock store");
+ return new ReturnType(ResultType.SUCCESS, lockReference+" is top of lock store");
}
public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
syncQuorum(keyspace, table, primaryKeyValue);
} catch (Exception e) {
StringWriter sw = new StringWriter();
- logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",
- ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",
+ ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
String exceptionAsString = sw.toString();
return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
}
- String cleanQuery = "delete from music_internal.unsynced_keys where key='"+localFullyQualifiedKey+"';";
+ String cleanQuery = "delete from " + syncTable + " where key='"+localFullyQualifiedKey+"';";
PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
deleteQueryObject.appendQueryString(cleanQuery);
MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
// get the primary key d
TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
- String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName();// we only support single
- // primary key
+ String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName(); // we only support single
+ // primary key
DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
Object cqlFormattedPrimaryKeyValue =
MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
String table = splitString[1];
String primaryKeyValue = splitString[2];
try {
- return "$" + fullyQualifiedKey + "$"
+ LockObject lockOwner = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
+ if (lockOwner==null) {
+ return "No lock holder!";
+ }
+ return "$" + fullyQualifiedKey + "$"
+ getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
} catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey
}
@Override
- public void destroyLockRef(String lockId) {
+ public void destroyLockRef(String lockId) throws MusicLockingException {
long start = System.currentTimeMillis();
String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
String table = splitString[1];
String primaryKeyValue = splitString[2];
try {
- getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef);
+ getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef,MusicUtil.getRetryCount());
} catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
- logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef ,
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef,
ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
+ throw new MusicLockingException(e.getMessage());
}
long end = System.currentTimeMillis();
logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
}
- public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) {
+ public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
long start = System.currentTimeMillis();
String[] splitString = fullyQualifiedKey.split("\\.");
String keyspace = splitString[0];
String table = splitString[1];
String primaryKeyValue = splitString[2];
try {
- getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference);
+ getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference,MusicUtil.getRetryCount());
} catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
- logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockReference ,
- ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
+ logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK + lockReference,
+ ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR,e);
+ throw new MusicLockingException(e.getMessage());
}
long end = System.currentTimeMillis();
logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
}
@Override
- public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) {
+ public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) throws MusicLockingException {
String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
if (voluntaryRelease) {
}
}
- public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference) {
- return destroyLockRef(fullyQualifiedKey, lockReference);
+ public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
+ MusicLockState result = null;
+ try {
+ result = destroyLockRef(fullyQualifiedKey, lockReference);
+ }
+ catch(Exception ex) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Exception in voluntaryReleaseLock() for "+ fullyQualifiedKey + "ref: " + lockReference);
+ throw new MusicLockingException(ex.getMessage());
+ }
+ return result;
}
- public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) {
+ public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
String[] splitString = fullyQualifiedKey.split("\\.");
String keyspace = splitString[0];
String table = splitString[1];
* @param lockName
* @throws MusicLockingException
*/
+ @Deprecated
public void deleteLock(String lockName) throws MusicLockingException {
- //deprecated
- }
+ throw new MusicLockingException("Depreciated Method Delete Lock");
+ }
// Prepared Query Additions.
try {
result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
} catch (MusicServiceException | MusicQueryException ex) {
- logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
return new ReturnType(ResultType.FAILURE, ex.getMessage());
}
try {
result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
} catch (MusicServiceException | MusicQueryException ex) {
- logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
- logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause(), ex);
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(),"[ERR512E] Failed to get Lock Handle ",
+ ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " ", ex);
return new ReturnType(ResultType.FAILURE, ex.getMessage());
}
if (result) {
if (conditionInfo != null)
try {
- if (conditionInfo.testCondition() == false)
- return new ReturnType(ResultType.FAILURE,
- "Lock acquired but the condition is not true");
+ if (conditionInfo.testCondition() == false)
+ return new ReturnType(ResultType.FAILURE,
+ "Lock acquired but the condition is not true");
} catch (Exception e) {
- logger.error(EELFLoggerDelegate.errorLogger, e);
- return new ReturnType(ResultType.FAILURE,
- "Exception thrown while checking the condition, check its sanctity:\n"
- + e.getMessage());
+ logger.error(EELFLoggerDelegate.errorLogger, e);
+ return new ReturnType(ResultType.FAILURE,
+ "Exception thrown while checking the condition, check its sanctity:\n"
+ + e.getMessage());
}
-
- String query = queryObject.getQuery();
- long timeOfWrite = System.currentTimeMillis();
- long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$")+1));
- long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
- // TODO: use Statement instead of modifying query
+ String query = queryObject.getQuery();
+ long timeOfWrite = System.currentTimeMillis();
+ long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$")+1));
+ long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
+ // TODO: use Statement instead of modifying query
if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
if (queryObject.getOperation().equalsIgnoreCase("delete"))
query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
else
query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
}
- queryObject.replaceQueryString(query);
- MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
- dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
- long end = System.currentTimeMillis();
- logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
+ queryObject.replaceQueryString(query);
+ MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
+ dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
+ long end = System.currentTimeMillis();
+ logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
}catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), e);
return new ReturnType(ResultType.FAILURE,
- "Exception thrown while doing the critical put\n"
- + e.getMessage());
+ "Exception thrown while doing the critical put\n"
+ + e.getMessage());
}
- return new ReturnType(ResultType.SUCCESS, "Update performed");
+ return new ReturnType(ResultType.SUCCESS, "Update performed");
}
String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
String lockId = createLockReference(fullyQualifiedKey);
long lockCreationTime = System.currentTimeMillis();
- ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockId);
+ ReturnType lockAcqResult = null;
+ logger.info(EELFLoggerDelegate.applicationLogger,"***Acquiring lock for atomicPut() query : " + queryObject.getQuery() + " : "+ primaryKey);
+ logger.info(EELFLoggerDelegate.applicationLogger,"***Acquiring lock for atomicPut() values: " + queryObject.getValues().toString());
+ if(conditionInfo!=null)
+ logger.info(EELFLoggerDelegate.applicationLogger,"***Acquiring lock for atomicPut() conditions: " + conditionInfo.toString());
+ try {
+ lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId,MusicUtil.getDefaultLockLeasePeriod());
+ }catch(MusicLockingException ex) {
+ logger.error(EELFLoggerDelegate.errorLogger,"Exception while acquireLockWithLease() in atomic put for key: "+primaryKey);
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage());
+ throw new MusicServiceException("Cannot perform atomic put for key: "+primaryKey+ " : "+ ex.getMessage());
+ }
long lockAcqTime = System.currentTimeMillis();
- if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
- logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
- voluntaryReleaseLock(fullyQualifiedKey,lockId);
- return lockAcqResult;
- }
+ /*
+ * if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+ * logger.info(EELFLoggerDelegate.
+ * applicationLogger,"unable to acquire lock, id " + lockId);
+ * voluntaryReleaseLock(fullyQualifiedKey,lockId); return lockAcqResult; }
+ */
logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
String lockRef = lockId.substring(lockId.lastIndexOf("$"));
- ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
- queryObject, lockRef, conditionInfo);
- long criticalPutTime = System.currentTimeMillis();
- voluntaryReleaseLock(fullyQualifiedKey,lockId);
- long lockDeleteTime = System.currentTimeMillis();
- String timingInfo = "|lock creation time:" + (lockCreationTime - start)
- + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
- + "|critical put time:" + (criticalPutTime - lockAcqTime)
+ ReturnType criticalPutResult = null ;
+ if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+ criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, queryObject, lockRef,
+ conditionInfo);
+ long criticalPutTime = System.currentTimeMillis();
+ long lockDeleteTime = System.currentTimeMillis();
+ String timingInfo = "|lock creation time:" + (lockCreationTime - start) + "|lock accquire time:"
+ + (lockAcqTime - lockCreationTime) + "|critical put time:" + (criticalPutTime - lockAcqTime)
+ "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
- criticalPutResult.setTimingInfo(timingInfo);
+ criticalPutResult.setTimingInfo(timingInfo);
+ }else {
+ logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
+ criticalPutResult = lockAcqResult;
+ }
+ try {
+ voluntaryReleaseLock(fullyQualifiedKey, lockId);
+
+ } catch (MusicLockingException ex) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Exception occured while deleting lock after atomic put for key: "+primaryKey);
+ criticalPutResult.setMessage(criticalPutResult.getMessage()+ "Lock release failed");
+ }
return criticalPutResult;
}
-
/**
* This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
*
String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
String lockId = createLockReference(fullyQualifiedKey);
long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
- ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockId);
+ ReturnType lockAcqResult = null;
+ ResultSet result =null;
+ logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock for atomicGet() : " + queryObject.getQuery());
+ try {
+ lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId,MusicUtil.getDefaultLockLeasePeriod());
+ }catch(MusicLockingException ex) {
+ logger.error(EELFLoggerDelegate.errorLogger,"Exception while acquireLockWithLease() in atomic get for key: "+primaryKey);
+ logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage());
+ throw new MusicServiceException("Cannot perform atomic get for key: "+primaryKey+ " : "+ ex.getMessage());
+ }
if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
String lockRef = lockId.substring(lockId.lastIndexOf("$"));
- ResultSet result =
- criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
- voluntaryReleaseLock(fullyQualifiedKey,lockId);
- return result;
+ result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
} else {
- voluntaryReleaseLock(fullyQualifiedKey,lockId);
logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
- return null;
+
}
+ try {
+ voluntaryReleaseLock(fullyQualifiedKey,lockId);
+ }catch(MusicLockingException ex) {
+ logger.info(EELFLoggerDelegate.applicationLogger,"Exception occured while deleting lock after atomic put for key: "+primaryKey);
+ throw new MusicLockingException(ex.getMessage());
+ }
+
+ return result;
}
}
@Override
+ @Deprecated
public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
- PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
- //Deprecated
+ PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
return null;
}
+
@Override
public List<String> getLockQueue(String fullyQualifiedKey)
- throws MusicServiceException, MusicQueryException, MusicLockingException {
+ throws MusicServiceException, MusicQueryException, MusicLockingException {
String[] splitString = fullyQualifiedKey.split("\\.");
String keyspace = splitString[0];
String table = splitString[1];
}
@Override
public long getLockQueueSize(String fullyQualifiedKey)
- throws MusicServiceException, MusicQueryException, MusicLockingException {
+ throws MusicServiceException, MusicQueryException, MusicLockingException {
String[] splitString = fullyQualifiedKey.split("\\.");
String keyspace = splitString[0];
String table = splitString[1];
return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
}
+
@Override
+ @Deprecated
public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
//deprecated