Read/Write locking implementation 90/74590/1
authorTschaen, Brendan <ctschaen@att.com>
Wed, 12 Dec 2018 18:47:54 +0000 (13:47 -0500)
committerTschaen, Brendan <ctschaen@att.com>
Wed, 12 Dec 2018 19:28:49 +0000 (14:28 -0500)
Change-Id: I31fedd52e138c848bf12ed0be27c348f4f96bcb5
Issue-ID: MUSIC-262
Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
src/main/java/org/onap/music/main/MusicCore.java
src/main/java/org/onap/music/service/MusicCoreService.java
src/main/java/org/onap/music/service/impl/MusicCassaCore.java
src/test/java/org/onap/music/unittests/MusicLockStoreTest.java

index 9b5793f..8065bf6 100644 (file)
@@ -56,7 +56,7 @@ public class CassaLockStore {
                 "Create lock queue/table for " +  keyspace+"."+table);
         table = table_prepend_name+table;
                String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
-                               + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, PRIMARY KEY ((key), lockReference) ) "
+                               + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, writeLock boolean, PRIMARY KEY ((key), lockReference) ) "
                                + "WITH CLUSTERING ORDER BY (lockReference ASC);";
                PreparedQueryObject queryObject = new PreparedQueryObject();
                
@@ -75,7 +75,24 @@ public class CassaLockStore {
         * @throws MusicServiceException
         * @throws MusicQueryException
         */
-       public String genLockRefandEnQueue(String keyspace, String table, String lockName) throws MusicServiceException, MusicQueryException {
+       public String genLockRefandEnQueue(String keyspace, String table, String lockName)
+                       throws MusicServiceException, MusicQueryException {
+               return genLockRefandEnQueue(keyspace, table, lockName, true);
+       }
+
+       
+       /**
+        * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps.
+        * @param keyspace of the locks.
+        * @param table of the locks.
+        * @param lockName is the primary key of the lock table
+        * @param isWriteLock true if this lock needs to be a write lock
+        * @return the UUID lock reference.
+        * @throws MusicServiceException
+        * @throws MusicQueryException
+        */
+       public String genLockRefandEnQueue(String keyspace, String table, String lockName, boolean isWriteLock)
+                       throws MusicServiceException, MusicQueryException {
         logger.info(EELFLoggerDelegate.applicationLogger,
                 "Create lock reference for " +  keyspace + "." + table + "." + lockName);
         table = table_prepend_name+table;
@@ -107,7 +124,7 @@ public class CassaLockStore {
                 " UPDATE " + keyspace + "." + table +
                                " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
                 " INSERT INTO " + keyspace + "." + table +
-                               "(key, lockReference, createTime, acquireTime) VALUES (?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
+                               "(key, lockReference, createTime, acquireTime, writeLock) VALUES (?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
 
         queryObject.addValue(lockRef);
         queryObject.addValue(lockName);
@@ -118,6 +135,7 @@ public class CassaLockStore {
         queryObject.addValue(lockRef);
         queryObject.addValue(String.valueOf(lockEpochMillis));
         queryObject.addValue("0");
+        queryObject.addValue(isWriteLock);
         queryObject.appendQueryString(insQuery);
         boolean pResult = dsHandle.executePut(queryObject, "critical");
                return String.valueOf(lockRef);
@@ -180,7 +198,8 @@ public class CassaLockStore {
         * @throws MusicServiceException
         * @throws MusicQueryException
         */
-       public LockObject peekLockQueue(String keyspace, String table, String key) throws MusicServiceException, MusicQueryException{
+       public LockObject peekLockQueue(String keyspace, String table, String key)
+                       throws MusicServiceException, MusicQueryException {
         logger.info(EELFLoggerDelegate.applicationLogger,
                 "Peek in lock table for " +  keyspace+"."+table+"."+key);
         table = table_prepend_name+table; 
@@ -196,6 +215,35 @@ public class CassaLockStore {
                return new LockObject(lockReference, createTime,acquireTime);
        }
        
+    public boolean isTopOfLockQueue(String keyspace, String table, String key, String lockRef)
+            throws MusicServiceException, MusicQueryException {
+        logger.info(EELFLoggerDelegate.applicationLogger,
+                "Checking in lock table for " + keyspace + "." + table + "." + key);
+        table = table_prepend_name + table;
+        String selectQuery =
+                "select * from " + keyspace + "." + table + " where key='" + key + "';";
+        PreparedQueryObject queryObject = new PreparedQueryObject();
+        queryObject.appendQueryString(selectQuery);
+        ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
+
+        boolean topOfQueue = true;
+        for (Row row : rs) {
+            if (row.getBool("writeLock")) {
+                if (topOfQueue && lockRef.equals("" + row.getLong("lockReference"))) {
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+            if (lockRef.equals("" + row.getLong("lockReference"))) {
+                return true;
+            }
+            topOfQueue = false;
+        }
+        logger.info(EELFLoggerDelegate.applicationLogger, "Could not find " + lockRef
+                + " in the lock queue. It has expired and no longer exists.");
+        return false;
+    }
        
        /**
         * This method removes the lock ref from the lock table/queue for the key. 
index e0a8ce6..5a77df7 100644 (file)
@@ -79,6 +79,10 @@ private static MusicCoreService musicCore = MusicCassaCore.getInstance();
                return musicCore.createLockReference(fullyQualifiedKey);
        }
        
+       public static String createLockReference(String fullyQualifiedKey, boolean isWriteLock) {
+           return musicCore.createLockReference(fullyQualifiedKey, isWriteLock);
+       }
+       
        public static MusicLockState  forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException, MusicServiceException, MusicQueryException{
                return musicCore.forciblyReleaseLock(fullyQualifiedKey, lockReference);
        }
index 3efda27..7074c6d 100644 (file)
@@ -72,6 +72,8 @@ public interface MusicCoreService {
        // Core Music Locking Service Methods
 
        public String createLockReference(String fullyQualifiedKey); // lock name
+       
+    public String createLockReference(String fullyQualifiedKey, boolean writeLock);
 
        public ReturnType acquireLockWithLease(String key, String lockReference, long leasePeriod)
                        throws MusicLockingException, MusicQueryException, MusicServiceException; // key,lock id,time
@@ -100,4 +102,5 @@ public interface MusicCoreService {
        
         public long getLockQueueSize(String fullyQualifiedKey)
                        throws MusicServiceException, MusicQueryException, MusicLockingException;
+
 }
index 9bbb3a9..fcf0280 100644 (file)
@@ -97,8 +97,11 @@ public class MusicCassaCore implements MusicCoreService {
     }
 
 
-
     public  String createLockReference(String fullyQualifiedKey) {
+        return createLockReference(fullyQualifiedKey, true);
+    }
+
+    public  String createLockReference(String fullyQualifiedKey, boolean isWriteLock) {
         String[] splitString = fullyQualifiedKey.split("\\.");
         String keyspace = splitString[0];
         String table = splitString[1];
@@ -108,7 +111,7 @@ public class MusicCassaCore implements MusicCoreService {
         long start = System.currentTimeMillis();
         String lockReference = null;
         try {
-                       lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName);
+                       lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, isWriteLock);
                } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
                        e.printStackTrace();
                }
@@ -142,25 +145,37 @@ public class MusicCassaCore implements MusicCoreService {
                }       
     }
     
-    private static ReturnType isTopOfLockStore(String keyspace, String table, String primaryKeyValue, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException {
-        
-        //return failure to lock holders too early or already evicted from the lock store
-        String topOfLockStoreS = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
+    private static ReturnType isTopOfLockStore(String keyspace, String table,
+            String primaryKeyValue, String lockReference)
+            throws MusicLockingException, MusicQueryException, MusicServiceException {
+
+        // return failure to lock holders too early or already evicted from the lock store
+        String topOfLockStoreS =
+                getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
         long topOfLockStoreL = Long.parseLong(topOfLockStoreS);
         long lockReferenceL = Long.parseLong(lockReference);
 
-        if(lockReferenceL > topOfLockStoreL) {
-            logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not the lock holder yet");
-            return new ReturnType(ResultType.FAILURE, lockReference+" is not the lock holder yet");
+        if (lockReferenceL > topOfLockStoreL) {
+            // only need to check if this is a read lock....
+            if (getLockingServiceHandle().isTopOfLockQueue(keyspace, table, primaryKeyValue,
+                    lockReference)) {
+                return new ReturnType(ResultType.SUCCESS, lockReference + " can read the values");
+            }
+            logger.info(EELFLoggerDelegate.applicationLogger,
+                    lockReference + " is not the lock holder yet");
+            return new ReturnType(ResultType.FAILURE,
+                    lockReference + " is not the lock holder yet");
+        }
+
+
+        if (lockReferenceL < topOfLockStoreL) {
+            logger.info(EELFLoggerDelegate.applicationLogger,
+                    lockReference + " is no longer/or was never in the lock store queue");
+            return new ReturnType(ResultType.FAILURE,
+                    lockReference + " is no longer/or was never in the lock store queue");
         }
-                       
 
-        if(lockReferenceL < topOfLockStoreL) {
-            logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is no longer/or was never in the lock store queue");
-            return new ReturnType(ResultType.FAILURE, lockReference+" is no longer/or was never in the lock store queue");
-               }
-               
-               return new ReturnType(ResultType.SUCCESS, lockReference+" is top of lock store");
+        return new ReturnType(ResultType.SUCCESS, lockReference + " is top of lock store");
     }
     
     public  ReturnType acquireLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException {
index e57b32e..f818627 100644 (file)
@@ -22,7 +22,9 @@
 package org.onap.music.unittests;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 import java.util.List;
 
@@ -125,4 +127,43 @@ public class MusicLockStoreTest {
         }
         assertEquals(21, lockStore.getLockQueueSize(CassandraCQL.keyspace, CassandraCQL.table, "test"));
     }
+    
+    @Test
+    public void Test_testCreateReadLock() throws MusicServiceException, MusicQueryException {
+        lockStore.createLockQueue(CassandraCQL.keyspace, CassandraCQL.table);
+        String readLockRef1 = lockStore.genLockRefandEnQueue(CassandraCQL.keyspace,
+                CassandraCQL.table, "test", false);
+        assertEquals(readLockRef1,
+                lockStore.peekLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test").lockRef);
+        assertTrue(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test",
+                readLockRef1));
+
+        String readLockRef2 = lockStore.genLockRefandEnQueue(CassandraCQL.keyspace,
+                CassandraCQL.table, "test", false);
+        assertTrue(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test",
+                readLockRef2));
+
+        String writelockRef3 =
+                lockStore.genLockRefandEnQueue(CassandraCQL.keyspace, CassandraCQL.table, "test");
+        String writelockRef4 =
+                lockStore.genLockRefandEnQueue(CassandraCQL.keyspace, CassandraCQL.table, "test");
+        assertFalse(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test",
+                writelockRef3));
+
+        lockStore.deQueueLockRef(CassandraCQL.keyspace, CassandraCQL.table, "test", readLockRef1);
+        assertTrue(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test",
+                readLockRef2));
+
+        lockStore.deQueueLockRef(CassandraCQL.keyspace, CassandraCQL.table, "test", readLockRef2);
+        assertTrue(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test",
+                writelockRef3));
+        assertFalse(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test",
+                writelockRef4));
+
+        lockStore.deQueueLockRef(CassandraCQL.keyspace, CassandraCQL.table, "test", writelockRef3);
+        assertTrue(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test",
+                writelockRef4));
+        assertFalse(lockStore.isTopOfLockQueue(CassandraCQL.keyspace, CassandraCQL.table, "test",
+                readLockRef1));
+    }
 }