Read lock promotion
[music.git] / src / main / java / org / onap / music / lockingservice / cassandra / CassaLockStore.java
index 3c3f716..1089847 100644 (file)
@@ -24,6 +24,7 @@
 package org.onap.music.lockingservice.cassandra;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import org.onap.music.datastore.MusicDataStore;
@@ -32,10 +33,15 @@ import org.onap.music.eelf.logging.EELFLoggerDelegate;
 import org.onap.music.exceptions.MusicLockingException;
 import org.onap.music.exceptions.MusicQueryException;
 import org.onap.music.exceptions.MusicServiceException;
+import org.onap.music.main.DeadlockDetectionUtil;
+import org.onap.music.main.DeadlockDetectionUtil.OwnershipType;
 import org.onap.music.main.MusicUtil;
-
+import org.onap.music.main.ResultType;
+import org.onap.music.main.ReturnType;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
 
 /*
  * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state. 
@@ -60,12 +66,15 @@ public class CassaLockStore {
         private String createTime;
         private String acquireTime;
         private LockType locktype;
-        public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype) {
+        // Owner is the self-declared client which "owns" this row. It is used for deadlock detection.  It is not (directly) related to isLockOwner.
+        private String owner;
+        public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype, String owner) {
             this.setIsLockOwner(isLockOwner);
             this.setLockRef(lockRef);
             this.setAcquireTime(acquireTime);
             this.setCreateTime(createTime);
             this.setLocktype(locktype);
+            this.setOwner(owner);
         }
         public boolean getIsLockOwner() {
             return isLockOwner;
@@ -97,6 +106,12 @@ public class CassaLockStore {
         public void setLocktype(LockType locktype) {
             this.locktype = locktype;
         }
+        public String getOwner() {
+            return owner;
+        }
+        public void setOwner(String owner) {
+            this.owner = owner;
+        }
     }
     
     /**
@@ -114,7 +129,7 @@ public class CassaLockStore {
         table = table_prepend_name+table;
         String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
                 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, "
-                + "writeLock boolean, PRIMARY KEY ((key), lockReference) ) "
+                + "lockType text, owner text, PRIMARY KEY ((key), lockReference) ) "
                 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
         PreparedQueryObject queryObject = new PreparedQueryObject();
         
@@ -129,22 +144,22 @@ public class CassaLockStore {
      * @param keyspace of the locks.
      * @param table of the locks.
      * @param lockName is the primary key of the lock table
+     * @param lockType is the type of lock (read/write)
+     * @param owner is the owner of the lock (optional, for deadlock detection)
      * @return the UUID lock reference.
      * @throws MusicServiceException
      * @throws MusicQueryException
      */
-    public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype) throws MusicServiceException, MusicQueryException, MusicLockingException {
-        return genLockRefandEnQueue(keyspace, table, lockName, locktype, 0);
+    public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner) throws MusicServiceException, MusicQueryException, MusicLockingException {
+        return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, 0);
     }
     
-    private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
+    private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
         logger.info(EELFLoggerDelegate.applicationLogger,
                 "Create " + locktype + " lock reference for " +  keyspace + "." + table + "." + lockName);
         String lockTable ="";
         lockTable = table_prepend_name+table;
     
-
-
         PreparedQueryObject queryObject = new PreparedQueryObject();
         String selectQuery = "SELECT guard FROM " + keyspace + "." + lockTable + " WHERE key=?;";
 
@@ -164,13 +179,14 @@ public class CassaLockStore {
 
         logger.info(EELFLoggerDelegate.applicationLogger,
                 "Created lock reference for " +  keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
-
+        
         queryObject = new PreparedQueryObject();
+        
         String insQuery = "BEGIN BATCH" +
                 " UPDATE " + keyspace + "." + lockTable +
                 " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
                 " INSERT INTO " + keyspace + "." + lockTable +
-                "(key, lockReference, createTime, acquireTime, writeLock) VALUES (?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
+                "(key, lockReference, createTime, acquireTime, lockType, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
 
         queryObject.addValue(lockRef);
         queryObject.addValue(lockName);
@@ -181,7 +197,8 @@ public class CassaLockStore {
         queryObject.addValue(lockRef);
         queryObject.addValue(String.valueOf(lockEpochMillis));
         queryObject.addValue("0");
-        queryObject.addValue(locktype==LockType.WRITE ? true : false );
+        queryObject.addValue(locktype);
+        queryObject.addValue(owner);
         queryObject.appendQueryString(insQuery);
         boolean pResult = dsHandle.executePut(queryObject, "critical");
         if (!pResult) {// couldn't create lock ref, retry
@@ -190,14 +207,12 @@ public class CassaLockStore {
                 logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to create lock reference");
                 throw new MusicLockingException("Unable to create lock reference");
             }
-            return genLockRefandEnQueue(keyspace, table, lockName, locktype, count);
+            return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, count);
         }
         return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef);
     }
-    
-    
 
-    /**
+       /**
      * Returns a result set containing the list of clients waiting for a particular lock
      * 
      * @param keyspace
@@ -269,43 +284,52 @@ public class CassaLockStore {
         ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
         Row row = results.one();
         if (row == null || row.isNull("lockReference")) {
-            return new LockObject(false, null, null, null, null);
+            return new LockObject(false, null, null, null, null, null);
         }
         String lockReference = "" + row.getLong("lockReference");
         String createTime = row.getString("createTime");
         String acquireTime = row.getString("acquireTime");
-        LockType locktype = row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
+        LockType locktype = row.get("lockType", LockType.class);
+        String owner = row.getString("owner");
 
-        return new LockObject(true, lockReference, createTime, acquireTime, locktype);
+        return new LockObject(true, lockReference, createTime, acquireTime, locktype, owner);
     }
 
     public List<String> getCurrentLockHolders(String keyspace, String table, String key)
             throws MusicServiceException, MusicQueryException {
         logger.info(EELFLoggerDelegate.applicationLogger,
                 "Getting lockholders in lock table for " + keyspace + "." + table + "." + key);
+        String origTable = table;
         table = table_prepend_name + table;
         String selectQuery = "select * from " + keyspace + "." + table + " where key=?;";
+        List<String> lockHolders = new ArrayList<>();
         PreparedQueryObject queryObject = new PreparedQueryObject();
         queryObject.appendQueryString(selectQuery);
         queryObject.addValue(key);
         ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
-
-        List<String> lockHolders = new ArrayList<>();
         boolean topOfQueue = true;
+        StringBuilder lock = new StringBuilder().
+        append("$").append(keyspace).append(".").append(origTable).
+        append(".").append(key).append("$");
+        StringBuilder lockReference = new StringBuilder();
         for (Row row : rs) {
-            String lockReference = "" + row.getLong("lockReference");
-            if (row.getBool("writeLock")) {
+                if ( row.isNull("lockReference") ) {
+                    return lockHolders;
+                }
+                lockReference.append(lock).append(row.getLong("lockReference"));
+            if (row.get("lockType", LockType.class)!=LockType.WRITE) {
                 if (topOfQueue) {
-                    lockHolders.add(lockReference);
+                    lockHolders.add(lockReference.toString());
                     break;
                 } else {
                     break;
                 }
             }
             // read lock
-            lockHolders.add(lockReference);
+            lockHolders.add(lockReference.toString());
 
             topOfQueue = false;
+            lockReference.delete(0,lockReference.length());
         }
         return lockHolders;
     }
@@ -335,16 +359,16 @@ public class CassaLockStore {
 
         boolean topOfQueue = true;
         for (Row row : rs) {
-               String lockReference = "" + row.getLong("lockReference");
-            if (row.getBool("writeLock")) {
+            String lockReference = "" + row.getLong("lockReference");
+            if (row.get("lockType", LockType.class)==LockType.WRITE) {
                 if (topOfQueue && lockRef.equals(lockReference)) {
-                       return true;
+                    return true;
                 } else {
-                       return false;
+                    return false;
                 }
             }
             if (lockRef.equals(lockReference)) {
-               return true;
+                return true;
             }
             topOfQueue = false;
         }
@@ -384,10 +408,11 @@ public class CassaLockStore {
         String lockReference = "" + row.getLong("lockReference");
         String createTime = row.getString("createTime");
         String acquireTime = row.getString("acquireTime");
-        LockType locktype = row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
+        LockType locktype = row.get("lockType", LockType.class);
         boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef);
+        String owner = row.getString("owner");
 
-        return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype);
+        return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype, owner);
     }
 
 
@@ -433,16 +458,147 @@ public class CassaLockStore {
     }
 
 
-    public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference)
-            throws MusicServiceException, MusicQueryException {
+    public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) {
         table = table_prepend_name + table;
-        PreparedQueryObject queryObject = new PreparedQueryObject();
         Long lockReferenceL = Long.parseLong(lockReference);
         String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis()
                 + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;";
-        queryObject.appendQueryString(updateQuery);
-        dsHandle.executePut(queryObject, "eventual");
 
+        //cannot use executePut because we need to ignore music timestamp adjustments for lock store
+        dsHandle.getSession().execute(updateQuery);
     }  
 
+    public boolean checkForDeadlock(String keyspace, String table, String lockName, LockType locktype, String owner, boolean forAcquire) throws MusicServiceException, MusicQueryException {
+        if (locktype.equals(LockType.READ)) return false;
+        if (owner==null || owner.length()==0) return false;
+
+        String lockTable = table_prepend_name + table;
+        PreparedQueryObject queryObject = new PreparedQueryObject();
+        queryObject.appendQueryString("SELECT key, acquiretime, owner FROM " + keyspace + "." + lockTable);
+        queryObject.appendQueryString(" WHERE lockType = ? ALLOW FILTERING");
+        queryObject.addValue(LockType.WRITE);
+
+        DeadlockDetectionUtil ddu = new DeadlockDetectionUtil();
+
+        ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject);
+        logger.debug("rs has " + rs.getAvailableWithoutFetching() + (rs.isFullyFetched()?"":" (or more!)") );
+        Iterator<Row> it = rs.iterator();
+        while (it.hasNext()) {
+            Row row = it.next();
+            logger.debug("key = " + row.getString("key") + ", time = " + row.getString("acquiretime") + ", owner = " + row.getString("owner") );
+            ddu.setExisting(row.getString("key"), row.getString("owner"), ("0".equals(row.getString("acquiretime")))?OwnershipType.CREATED:OwnershipType.ACQUIRED);
+        }
+        boolean deadlock = ddu.checkForDeadlock(lockName, owner, forAcquire?OwnershipType.ACQUIRED:OwnershipType.CREATED);
+        if (deadlock) logger.warn("Deadlock detected when " + owner + " tried to create lock on " + keyspace + "." + lockTable + "." + lockName);
+        return deadlock;
+    }
+
+    public List<String> getAllLocksForOwner(String ownerId, String keyspace, String table) throws MusicServiceException, MusicQueryException {
+        List<String> toRet = new ArrayList<String>();
+        String lockTable = table_prepend_name + table;
+        PreparedQueryObject queryObject = new PreparedQueryObject();
+        queryObject.appendQueryString("SELECT key, lockreference FROM " + keyspace + "." + lockTable);
+        queryObject.appendQueryString(" WHERE owner = '" + ownerId + "' ALLOW FILTERING");
+
+        ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject);
+        Iterator<Row> it = rs.iterator();
+        while (it.hasNext()) {
+            Row row = it.next();
+            toRet.add(row.getString("key") + "$" + row.getLong("lockreference"));
+        }
+        return toRet;
+    }
+
+    public ReturnType promoteLock(String keyspace, String table, String key, String lockRef)
+            throws MusicLockingException, MusicServiceException, MusicQueryException {
+        String lockqtable = table_prepend_name + table;
+        String selectQuery = "select * from " + keyspace + "." + lockqtable + " where key=?;";
+
+        PreparedQueryObject queryObject = new PreparedQueryObject();
+        queryObject.appendQueryString(selectQuery);
+        queryObject.addValue(key);
+        ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
+        
+        long refToPromote = Long.parseLong(lockRef);
+
+        boolean topOfQueue = true;
+        boolean readBlock = false;
+        boolean seenLockToPromote = false;
+        boolean promotionOngoing = false;
+        long readBlockStart = 0;
+        long readBlockEnd = 0;
+
+
+        for (Row row : rs) {
+            long ref = row.getLong("lockreference");
+            LockType lockType = row.get("lockType", LockType.class);
+            
+            if (refToPromote==ref) {
+                if (promotionOngoing) {
+                    return new ReturnType(ResultType.FAILURE, "Can't promote, already promoting another lockref.");
+                }
+                seenLockToPromote = true;
+                if (!topOfQueue) {
+                    readBlockStart = ref;
+                    readBlockEnd = ref;
+                    break;
+                }
+            } else if (!seenLockToPromote && refToPromote<ref) {
+                return new ReturnType(ResultType.FAILURE, "Lockref does not exist.");
+            }
+            
+            if (lockType==LockType.READ || lockType==LockType.PROMOTING) {
+                if (!readBlock) {
+                    readBlockStart = ref;
+                    readBlock = true;
+                }
+                if (readBlock) {
+                    readBlockEnd = ref;
+                }
+                if (lockType==LockType.PROMOTING) {
+                    promotionOngoing = true;
+                }
+            }
+            
+            if (lockType==LockType.WRITE) {
+                if (refToPromote==ref) {
+                    return new ReturnType(ResultType.FAILURE, "Lockref is already write.");
+                }
+                if (readBlock) {
+                    readBlock = false;
+                    promotionOngoing = false;
+                    if (seenLockToPromote) {
+                        break;
+                    }
+                    //can no longer be lock holder after this
+                    topOfQueue = false;
+                }
+            }
+        }
+
+        if (readBlockStart<=refToPromote && refToPromote<=readBlockEnd) {
+            if (readBlockStart==refToPromote && refToPromote==readBlockEnd) {
+                promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.WRITE);
+                return new ReturnType(ResultType.SUCCESS, "Lock has successfully been upgraded.");
+            }
+            promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.PROMOTING);
+            return new ReturnType(ResultType.FAILURE, "Your lock upgrade is in progress. Check again to see if successful."); 
+        }
+        
+        //shouldn't reach here?
+        return new ReturnType(ResultType.FAILURE,"Promotion failed.");
+    }
+
+    private void promoteLockTo(String keyspace, String table, String key, String lockRef, LockType newLockType)
+            throws MusicServiceException, MusicQueryException {
+        PreparedQueryObject queryObject =
+                new PreparedQueryObject("UPDATE " + keyspace + "." + table + " SET lockType=? WHERE key='" + key
+                        + "' AND lockReference = " + lockRef + " IF EXISTS;", newLockType);
+
+        //cannot use executePut because we need to ignore music timestamp adjustments for lock store
+        dsHandle.executePut(queryObject, MusicUtil.QUORUM);
+    }
+    
+
+
 }