"Create lock queue/table for " + keyspace+"."+table);
table = table_prepend_name+table;
String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
- + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, PRIMARY KEY ((key), lockReference) ) "
+ + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, writeLock boolean, PRIMARY KEY ((key), lockReference) ) "
+ "WITH CLUSTERING ORDER BY (lockReference ASC);";
PreparedQueryObject queryObject = new PreparedQueryObject();
* @throws MusicServiceException
* @throws MusicQueryException
*/
- public String genLockRefandEnQueue(String keyspace, String table, String lockName) throws MusicServiceException, MusicQueryException {
+ public String genLockRefandEnQueue(String keyspace, String table, String lockName)
+ throws MusicServiceException, MusicQueryException {
+ return genLockRefandEnQueue(keyspace, table, lockName, true);
+ }
+
+
+ /**
+ * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps.
+ * @param keyspace of the locks.
+ * @param table of the locks.
+ * @param lockName is the primary key of the lock table
+ * @param isWriteLock true if this lock needs to be a write lock
+ * @return the UUID lock reference.
+ * @throws MusicServiceException
+ * @throws MusicQueryException
+ */
+ public String genLockRefandEnQueue(String keyspace, String table, String lockName, boolean isWriteLock)
+ throws MusicServiceException, MusicQueryException {
logger.info(EELFLoggerDelegate.applicationLogger,
"Create lock reference for " + keyspace + "." + table + "." + lockName);
table = table_prepend_name+table;
" UPDATE " + keyspace + "." + table +
" SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
" INSERT INTO " + keyspace + "." + table +
- "(key, lockReference, createTime, acquireTime) VALUES (?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
+ "(key, lockReference, createTime, acquireTime, writeLock) VALUES (?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
queryObject.addValue(lockRef);
queryObject.addValue(lockName);
queryObject.addValue(lockRef);
queryObject.addValue(String.valueOf(lockEpochMillis));
queryObject.addValue("0");
+ queryObject.addValue(isWriteLock);
queryObject.appendQueryString(insQuery);
boolean pResult = dsHandle.executePut(queryObject, "critical");
return String.valueOf(lockRef);
* @throws MusicServiceException
* @throws MusicQueryException
*/
- public LockObject peekLockQueue(String keyspace, String table, String key) throws MusicServiceException, MusicQueryException{
+ public LockObject peekLockQueue(String keyspace, String table, String key)
+ throws MusicServiceException, MusicQueryException {
logger.info(EELFLoggerDelegate.applicationLogger,
"Peek in lock table for " + keyspace+"."+table+"."+key);
table = table_prepend_name+table;
return new LockObject(lockReference, createTime,acquireTime);
}
+ public boolean isTopOfLockQueue(String keyspace, String table, String key, String lockRef)
+ throws MusicServiceException, MusicQueryException {
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ "Checking in lock table for " + keyspace + "." + table + "." + key);
+ table = table_prepend_name + table;
+ String selectQuery =
+ "select * from " + keyspace + "." + table + " where key='" + key + "';";
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ queryObject.appendQueryString(selectQuery);
+ ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
+
+ boolean topOfQueue = true;
+ for (Row row : rs) {
+ if (row.getBool("writeLock")) {
+ if (topOfQueue && lockRef.equals("" + row.getLong("lockReference"))) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ if (lockRef.equals("" + row.getLong("lockReference"))) {
+ return true;
+ }
+ topOfQueue = false;
+ }
+ logger.info(EELFLoggerDelegate.applicationLogger, "Could not find " + lockRef
+ + " in the lock queue. It has expired and no longer exists.");
+ return false;
+ }
/**
* This method removes the lock ref from the lock table/queue for the key.
return musicCore.createLockReference(fullyQualifiedKey);
}
+ public static String createLockReference(String fullyQualifiedKey, boolean isWriteLock) {
+ return musicCore.createLockReference(fullyQualifiedKey, isWriteLock);
+ }
+
public static MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException, MusicServiceException, MusicQueryException{
return musicCore.forciblyReleaseLock(fullyQualifiedKey, lockReference);
}
// Core Music Locking Service Methods
public String createLockReference(String fullyQualifiedKey); // lock name
+
+ public String createLockReference(String fullyQualifiedKey, boolean writeLock);
public ReturnType acquireLockWithLease(String key, String lockReference, long leasePeriod)
throws MusicLockingException, MusicQueryException, MusicServiceException; // key,lock id,time
public long getLockQueueSize(String fullyQualifiedKey)
throws MusicServiceException, MusicQueryException, MusicLockingException;
+
}
}
-
public String createLockReference(String fullyQualifiedKey) {
+ return createLockReference(fullyQualifiedKey, true);
+ }
+
+ public String createLockReference(String fullyQualifiedKey, boolean isWriteLock) {
String[] splitString = fullyQualifiedKey.split("\\.");
String keyspace = splitString[0];
String table = splitString[1];
long start = System.currentTimeMillis();
String lockReference = null;
try {
- lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName);
+ lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, isWriteLock);
} catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
e.printStackTrace();
}
}
}
- private static ReturnType isTopOfLockStore(String keyspace, String table, String primaryKeyValue, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException {
-
- //return failure to lock holders too early or already evicted from the lock store
- String topOfLockStoreS = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
+ private static ReturnType isTopOfLockStore(String keyspace, String table,
+ String primaryKeyValue, String lockReference)
+ throws MusicLockingException, MusicQueryException, MusicServiceException {
+
+ // return failure to lock holders too early or already evicted from the lock store
+ String topOfLockStoreS =
+ getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
long topOfLockStoreL = Long.parseLong(topOfLockStoreS);
long lockReferenceL = Long.parseLong(lockReference);
- if(lockReferenceL > topOfLockStoreL) {
- logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not the lock holder yet");
- return new ReturnType(ResultType.FAILURE, lockReference+" is not the lock holder yet");
+ if (lockReferenceL > topOfLockStoreL) {
+ // only need to check if this is a read lock....
+ if (getLockingServiceHandle().isTopOfLockQueue(keyspace, table, primaryKeyValue,
+ lockReference)) {
+ return new ReturnType(ResultType.SUCCESS, lockReference + " can read the values");
+ }
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ lockReference + " is not the lock holder yet");
+ return new ReturnType(ResultType.FAILURE,
+ lockReference + " is not the lock holder yet");
+ }
+
+
+ if (lockReferenceL < topOfLockStoreL) {
+ logger.info(EELFLoggerDelegate.applicationLogger,
+ lockReference + " is no longer/or was never in the lock store queue");
+ return new ReturnType(ResultType.FAILURE,
+ lockReference + " is no longer/or was never in the lock store queue");
}
-
- if(lockReferenceL < topOfLockStoreL) {
- logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is no longer/or was never in the lock store queue");
- return new ReturnType(ResultType.FAILURE, lockReference+" is no longer/or was never in the lock store queue");
- }
-
- return new ReturnType(ResultType.SUCCESS, lockReference+" is top of lock store");
+ return new ReturnType(ResultType.SUCCESS, lockReference + " is top of lock store");
}
public ReturnType acquireLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException {
package org.onap.music.unittests;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import java.util.List;
}
assertEquals(21, lockStore.getLockQueueSize(CassandraCQL.keyspace, CassandraCQL.table, "test"));
}
+
+ @Test
+ public void Test_testCreateReadLock() throws MusicServiceException, MusicQueryException {
+ lockStore.createLockQueue(CassandraCQL.keyspace, CassandraCQL.table);
+ String readLockRef1 = lockStore.genLockRefandEnQueue(CassandraCQL.keyspace,
+ CassandraCQL.table, "test", false);
+ assertEquals(readLockRef1,
+ lockStore.peekLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test").lockRef);
+ assertTrue(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test",
+ readLockRef1));
+
+ String readLockRef2 = lockStore.genLockRefandEnQueue(CassandraCQL.keyspace,
+ CassandraCQL.table, "test", false);
+ assertTrue(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test",
+ readLockRef2));
+
+ String writelockRef3 =
+ lockStore.genLockRefandEnQueue(CassandraCQL.keyspace, CassandraCQL.table, "test");
+ String writelockRef4 =
+ lockStore.genLockRefandEnQueue(CassandraCQL.keyspace, CassandraCQL.table, "test");
+ assertFalse(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test",
+ writelockRef3));
+
+ lockStore.deQueueLockRef(CassandraCQL.keyspace, CassandraCQL.table, "test", readLockRef1);
+ assertTrue(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test",
+ readLockRef2));
+
+ lockStore.deQueueLockRef(CassandraCQL.keyspace, CassandraCQL.table, "test", readLockRef2);
+ assertTrue(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test",
+ writelockRef3));
+ assertFalse(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test",
+ writelockRef4));
+
+ lockStore.deQueueLockRef(CassandraCQL.keyspace, CassandraCQL.table, "test", writelockRef3);
+ assertTrue(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test",
+ writelockRef4));
+ assertFalse(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test",
+ readLockRef1));
+ }
}