X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fmusic%2Flockingservice%2Fcassandra%2FCassaLockStore.java;h=06a087ab2b775428cd01902f4bbb84e40e740b23;hb=881f14bc8676cedd68e17bd007a869fa85578fa1;hp=4ed635759176b067aa4b350bdabb20d8a49fccaf;hpb=695450aa1408f9577e4814f1aae4911dfbae953f;p=music.git diff --git a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java index 4ed63575..06a087ab 100644 --- a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java +++ b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java @@ -29,8 +29,10 @@ import java.util.List; import org.onap.music.datastore.MusicDataStore; import org.onap.music.datastore.PreparedQueryObject; import org.onap.music.eelf.logging.EELFLoggerDelegate; +import org.onap.music.exceptions.MusicLockingException; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.main.MusicUtil; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; @@ -43,19 +45,8 @@ public class CassaLockStore { private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class); private static String table_prepend_name = "lockQ_"; + private MusicDataStore dsHandle; - public class LockObject{ - public String lockRef; - public String createTime; - public String acquireTime; - public LockObject(String lockRef, String createTime, String acquireTime) { - this.lockRef = lockRef; - this.acquireTime = acquireTime; - this.createTime = createTime; - - } - } - MusicDataStore dsHandle; public CassaLockStore() { dsHandle = new MusicDataStore(); } @@ -63,7 +54,50 @@ public class CassaLockStore { public CassaLockStore(MusicDataStore dsHandle) { this.dsHandle=dsHandle; } - + public class LockObject{ + private boolean isLockOwner; + private String lockRef; + private String createTime; + private String acquireTime; + private LockType locktype; + public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype) { + this.setIsLockOwner(isLockOwner); + this.setLockRef(lockRef); + this.setAcquireTime(acquireTime); + this.setCreateTime(createTime); + this.setLocktype(locktype); + } + public boolean getIsLockOwner() { + return isLockOwner; + } + public void setIsLockOwner(boolean isLockOwner) { + this.isLockOwner = isLockOwner; + } + public String getAcquireTime() { + return acquireTime; + } + public void setAcquireTime(String acquireTime) { + this.acquireTime = acquireTime; + } + public String getCreateTime() { + return createTime; + } + public void setCreateTime(String createTime) { + this.createTime = createTime; + } + public String getLockRef() { + return lockRef; + } + public void setLockRef(String lockRef) { + this.lockRef = lockRef; + } + public LockType getLocktype() { + return locktype; + } + public void setLocktype(LockType locktype) { + this.locktype = locktype; + } + } /** * @@ -79,7 +113,8 @@ public class CassaLockStore { "Create lock queue/table for " + keyspace+"."+table); table = table_prepend_name+table; String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table - + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, PRIMARY KEY ((key), lockReference) ) " + + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, " + + "writeLock boolean, PRIMARY KEY ((key), lockReference) ) " + "WITH CLUSTERING ORDER BY (lockReference ASC);"; PreparedQueryObject queryObject = new PreparedQueryObject(); @@ -98,12 +133,16 @@ public class CassaLockStore { * @throws MusicServiceException * @throws MusicQueryException */ - public String genLockRefandEnQueue(String keyspace, String table, String lockName) throws MusicServiceException, MusicQueryException { + public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype) throws MusicServiceException, MusicQueryException, MusicLockingException { + return genLockRefandEnQueue(keyspace, table, lockName, locktype, 0); + } + + private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, int count) throws MusicServiceException, MusicQueryException, MusicLockingException { logger.info(EELFLoggerDelegate.applicationLogger, - "Create lock reference for " + keyspace + "." + table + "." + lockName); + "Create " + locktype + " lock reference for " + keyspace + "." + table + "." + lockName); String lockTable =""; lockTable = table_prepend_name+table; - + PreparedQueryObject queryObject = new PreparedQueryObject(); @@ -131,7 +170,7 @@ public class CassaLockStore { " UPDATE " + keyspace + "." + lockTable + " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" + " INSERT INTO " + keyspace + "." + lockTable + - "(key, lockReference, createTime, acquireTime) VALUES (?,?,?,?) IF NOT EXISTS; APPLY BATCH;"; + "(key, lockReference, createTime, acquireTime, writeLock) VALUES (?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;"; queryObject.addValue(lockRef); queryObject.addValue(lockName); @@ -142,13 +181,25 @@ public class CassaLockStore { queryObject.addValue(lockRef); queryObject.addValue(String.valueOf(lockEpochMillis)); queryObject.addValue("0"); + queryObject.addValue(locktype==LockType.WRITE ? true : false ); queryObject.appendQueryString(insQuery); - dsHandle.executePut(queryObject, "critical"); - return "$"+keyspace+"."+table+"."+lockName+"$"+ lockRef; + boolean pResult = dsHandle.executePut(queryObject, "critical"); + if (!pResult) {// couldn't create lock ref, retry + count++; + if (count > MusicUtil.getRetryCount()) { + logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to create lock reference"); + throw new MusicLockingException("Unable to create lock reference"); + } + return genLockRefandEnQueue(keyspace, table, lockName, locktype, count); + } + return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef); } + + /** * Returns a result set containing the list of clients waiting for a particular lock + * * @param keyspace * @param table * @param key @@ -159,22 +210,23 @@ public class CassaLockStore { public List getLockQueue(String keyspace, String table, String key) throws MusicServiceException, MusicQueryException { logger.info(EELFLoggerDelegate.applicationLogger, - "Getting the queue for " + keyspace+"."+table+"."+key); - table = table_prepend_name+table; + "Getting the queue for " + keyspace + "." + table + "." + key); + table = table_prepend_name + table; String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "';"; PreparedQueryObject queryObject = new PreparedQueryObject(); queryObject.appendQueryString(selectQuery); ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject); ArrayList lockQueue = new ArrayList<>(); - for (Row row: rs) { + for (Row row : rs) { lockQueue.add(Long.toString(row.getLong("lockReference"))); } return lockQueue; } - - + + /** * Returns a result set containing the list of clients waiting for a particular lock + * * @param keyspace * @param table * @param key @@ -185,70 +237,221 @@ public class CassaLockStore { public long getLockQueueSize(String keyspace, String table, String key) throws MusicServiceException, MusicQueryException { logger.info(EELFLoggerDelegate.applicationLogger, - "Getting the queue size for " + keyspace+"."+table+"."+key); - table = table_prepend_name+table; + "Getting the queue size for " + keyspace + "." + table + "." + key); + table = table_prepend_name + table; String selectQuery = "select count(*) from " + keyspace + "." + table + " where key='" + key + "';"; PreparedQueryObject queryObject = new PreparedQueryObject(); queryObject.appendQueryString(selectQuery); - ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject); - return rs.one().getLong("count"); - } + ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject); + return rs.one().getLong("count"); + } /** * This method returns the top of lock table/queue for the key. + * * @param keyspace of the application. * @param table of the application. * @param key is the primary key of the application table - * @return the UUID lock reference. + * @return the UUID lock reference. Returns LockObject.isLockOwner=false if there is no owner or the + * lock doesn't exist * @throws MusicServiceException * @throws MusicQueryException */ - public LockObject peekLockQueue(String keyspace, String table, String key) throws MusicServiceException, MusicQueryException{ + public LockObject peekLockQueue(String keyspace, String table, String key) + throws MusicServiceException, MusicQueryException { logger.info(EELFLoggerDelegate.applicationLogger, - "Peek in lock table for " + keyspace+"."+table+"."+key); - table = table_prepend_name+table; - String selectQuery = "select * from "+keyspace+"."+table+" where key='"+key+"' LIMIT 1;"; + "Peek in lock table for " + keyspace + "." + table + "." + key); + table = table_prepend_name + table; + String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "' LIMIT 1;"; PreparedQueryObject queryObject = new PreparedQueryObject(); queryObject.appendQueryString(selectQuery); ResultSet results = dsHandle.executeOneConsistencyGet(queryObject); Row row = results.one(); + if (row == null || row.isNull("lockReference")) { + return new LockObject(false, null, null, null, null); + } String lockReference = "" + row.getLong("lockReference"); String createTime = row.getString("createTime"); String acquireTime = row.getString("acquireTime"); + LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ; - return new LockObject(lockReference, createTime,acquireTime); + return new LockObject(true, lockReference, createTime, acquireTime, locktype); } - - + + public List getCurrentLockHolders(String keyspace, String table, String key) + throws MusicServiceException, MusicQueryException { + logger.info(EELFLoggerDelegate.applicationLogger, + "Getting lockholders in lock table for " + keyspace + "." + table + "." + key); + String origTable = table; + table = table_prepend_name + table; + String selectQuery = "select * from " + keyspace + "." + table + " where key=?;"; + List lockHolders = new ArrayList<>(); + PreparedQueryObject queryObject = new PreparedQueryObject(); + queryObject.appendQueryString(selectQuery); + queryObject.addValue(key); + ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject); + boolean topOfQueue = true; + StringBuilder lock = new StringBuilder(). + append("$").append(keyspace).append(".").append(origTable). + append(".").append(key).append("$"); + StringBuilder lockReference = new StringBuilder(); + for (Row row : rs) { + if ( row.isNull("lockReference") ) { + return lockHolders; + } + lockReference.append(lock).append(row.getLong("lockReference")); + if (row.isNull("writeLock") || row.getBool("writeLock")) { + if (topOfQueue) { + lockHolders.add(lockReference.toString()); + break; + } else { + break; + } + } + // read lock + lockHolders.add(lockReference.toString()); + + topOfQueue = false; + lockReference.delete(0,lockReference.length()); + } + return lockHolders; + } + /** - * This method removes the lock ref from the lock table/queue for the key. - * @param keyspace of the application. - * @param table of the application. + * Determine if the lock is a valid current lock holder. + * + * @param keyspace + * @param table + * @param key + * @param lockRef + * @return true if lockRef is a lock owner of key + * @throws MusicServiceException + * @throws MusicQueryException + */ + public boolean isLockOwner(String keyspace, String table, String key, String lockRef) + throws MusicServiceException, MusicQueryException { + logger.info(EELFLoggerDelegate.applicationLogger, + "Checking in lock table for " + keyspace + "." + table + "." + key); + table = table_prepend_name + table; + String selectQuery = + "select * from " + keyspace + "." + table + " where key=?;"; + PreparedQueryObject queryObject = new PreparedQueryObject(); + queryObject.appendQueryString(selectQuery); + queryObject.addValue(key); + ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject); + + boolean topOfQueue = true; + for (Row row : rs) { + String lockReference = "" + row.getLong("lockReference"); + if (row.isNull("writeLock") || row.getBool("writeLock")) { + if (topOfQueue && lockRef.equals(lockReference)) { + return true; + } else { + return false; + } + } + if (lockRef.equals(lockReference)) { + return true; + } + topOfQueue = false; + } + logger.info(EELFLoggerDelegate.applicationLogger, "Could not find " + lockRef + + " in the lock queue. It has expired and no longer exists."); + return false; + } + + /** + * Determine if the lock is a valid current lock holder. + * + * @param keyspace + * @param table + * @param key + * @param lockRef + * @return true if lockRef is a lock owner of key + * @throws MusicServiceException + * @throws MusicQueryException + */ + public LockObject getLockInfo(String keyspace, String table, String key, String lockRef) + throws MusicServiceException, MusicQueryException { + logger.info(EELFLoggerDelegate.applicationLogger, + "Checking in lock table for " + keyspace + "." + table + "." + key); + String lockQ_table = table_prepend_name + table; + String selectQuery = + "select * from " + keyspace + "." + lockQ_table + " where key=? and lockReference=?;"; + PreparedQueryObject queryObject = new PreparedQueryObject(); + queryObject.appendQueryString(selectQuery); + queryObject.addValue(key); + queryObject.addValue(Long.parseLong(lockRef)); + ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject); + Row row = rs.one(); + if (row == null || row.isNull("lockReference")) { + return null; + } + + String lockReference = "" + row.getLong("lockReference"); + String createTime = row.getString("createTime"); + String acquireTime = row.getString("acquireTime"); + LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ; + boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef); + + return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype); + } + + + + /** + * This method removes the lock ref from the lock table/queue for the key. + * + * @param keyspace of the application. + * @param table of the application. * @param key is the primary key of the application table * @param lockReference the lock reference that needs to be dequeued. * @throws MusicServiceException * @throws MusicQueryException - */ - public void deQueueLockRef(String keyspace, String table, String key, String lockReference) throws MusicServiceException, MusicQueryException{ - table = table_prepend_name+table; + * @throws MusicLockingException + */ + public void deQueueLockRef(String keyspace, String table, String key, String lockReference, int n) + throws MusicServiceException, MusicQueryException, MusicLockingException { + String prependTable = table_prepend_name + table; PreparedQueryObject queryObject = new PreparedQueryObject(); - Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf('$')+1)); - String deleteQuery = "delete from "+keyspace+"."+table+" where key='"+key+"' AND lockReference ="+lockReferenceL+" IF EXISTS;"; + Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf("$") + 1)); + String deleteQuery = "delete from " + keyspace + "." + prependTable + " where key='" + key + + "' AND lockReference =" + lockReferenceL + " IF EXISTS;"; queryObject.appendQueryString(deleteQuery); - dsHandle.executePut(queryObject, "critical"); + logger.info(EELFLoggerDelegate.applicationLogger, "Removing lock for key: "+key+ " and reference: "+lockReference); + try { + dsHandle.executePut(queryObject, "critical"); + logger.info(EELFLoggerDelegate.applicationLogger, + "Lock removed for key: " + key + " and reference: " + lockReference); + } catch (MusicServiceException ex) { + logger.error(logger, ex.getMessage(), ex); + logger.error(EELFLoggerDelegate.applicationLogger, + "Exception while deQueueLockRef for lockname: " + key + " reference:" + lockReference); + if (n > 1) { + logger.info(EELFLoggerDelegate.applicationLogger, "Trying again..."); + deQueueLockRef(keyspace, table, key, lockReference, n - 1); + } else { + logger.error(EELFLoggerDelegate.applicationLogger, + "deQueueLockRef failed for lockname: " + key + " reference:" + lockReference); + logger.error(logger, ex.getMessage(), ex); + throw new MusicLockingException("Error while deQueueLockRef: " + ex.getMessage()); + } + } } - - public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) throws MusicServiceException, MusicQueryException{ - table = table_prepend_name+table; + + public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) + throws MusicServiceException, MusicQueryException { + table = table_prepend_name + table; PreparedQueryObject queryObject = new PreparedQueryObject(); Long lockReferenceL = Long.parseLong(lockReference); - String updateQuery = "update "+keyspace+"."+table+" set acquireTime='"+ System.currentTimeMillis()+"' where key='"+key+"' AND lockReference = "+lockReferenceL+" IF EXISTS;"; + String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis() + + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;"; queryObject.appendQueryString(updateQuery); - dsHandle.executePut(queryObject, "eventual"); - } - + //cannot use executePut because we need to ignore music timestamp adjustments for lock store + dsHandle.getSession().execute(updateQuery); + } }