X-Git-Url: https://gerrit.onap.org/r/gitweb?p=music.git;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fmusic%2Flockingservice%2Fcassandra%2FCassaLockStore.java;h=108984760b4939af9a202afb23715f9e7061c8fc;hp=06a087ab2b775428cd01902f4bbb84e40e740b23;hb=90d35b7f55d1ea3eb6ccf8218d9ac42412fd0d90;hpb=fcca903302375403ff16216ce9b4b3cca2d1362c diff --git a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java index 06a087ab..10898476 100644 --- a/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java +++ b/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java @@ -24,6 +24,7 @@ package org.onap.music.lockingservice.cassandra; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.onap.music.datastore.MusicDataStore; @@ -32,10 +33,15 @@ import org.onap.music.eelf.logging.EELFLoggerDelegate; import org.onap.music.exceptions.MusicLockingException; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; +import org.onap.music.main.DeadlockDetectionUtil; +import org.onap.music.main.DeadlockDetectionUtil.OwnershipType; import org.onap.music.main.MusicUtil; - +import org.onap.music.main.ResultType; +import org.onap.music.main.ReturnType; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.extras.codecs.enums.EnumNameCodec; /* * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state. @@ -60,12 +66,15 @@ public class CassaLockStore { private String createTime; private String acquireTime; private LockType locktype; - public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype) { + // Owner is the self-declared client which "owns" this row. It is used for deadlock detection. It is not (directly) related to isLockOwner. + private String owner; + public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype, String owner) { this.setIsLockOwner(isLockOwner); this.setLockRef(lockRef); this.setAcquireTime(acquireTime); this.setCreateTime(createTime); this.setLocktype(locktype); + this.setOwner(owner); } public boolean getIsLockOwner() { return isLockOwner; @@ -97,6 +106,12 @@ public class CassaLockStore { public void setLocktype(LockType locktype) { this.locktype = locktype; } + public String getOwner() { + return owner; + } + public void setOwner(String owner) { + this.owner = owner; + } } /** @@ -114,7 +129,7 @@ public class CassaLockStore { table = table_prepend_name+table; String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, " - + "writeLock boolean, PRIMARY KEY ((key), lockReference) ) " + + "lockType text, owner text, PRIMARY KEY ((key), lockReference) ) " + "WITH CLUSTERING ORDER BY (lockReference ASC);"; PreparedQueryObject queryObject = new PreparedQueryObject(); @@ -129,22 +144,22 @@ public class CassaLockStore { * @param keyspace of the locks. * @param table of the locks. * @param lockName is the primary key of the lock table + * @param lockType is the type of lock (read/write) + * @param owner is the owner of the lock (optional, for deadlock detection) * @return the UUID lock reference. * @throws MusicServiceException * @throws MusicQueryException */ - public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype) throws MusicServiceException, MusicQueryException, MusicLockingException { - return genLockRefandEnQueue(keyspace, table, lockName, locktype, 0); + public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner) throws MusicServiceException, MusicQueryException, MusicLockingException { + return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, 0); } - private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, int count) throws MusicServiceException, MusicQueryException, MusicLockingException { + private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner, int count) throws MusicServiceException, MusicQueryException, MusicLockingException { logger.info(EELFLoggerDelegate.applicationLogger, "Create " + locktype + " lock reference for " + keyspace + "." + table + "." + lockName); String lockTable =""; lockTable = table_prepend_name+table; - - PreparedQueryObject queryObject = new PreparedQueryObject(); String selectQuery = "SELECT guard FROM " + keyspace + "." + lockTable + " WHERE key=?;"; @@ -164,13 +179,14 @@ public class CassaLockStore { logger.info(EELFLoggerDelegate.applicationLogger, "Created lock reference for " + keyspace + "." + lockTable + "." + lockName + ":" + lockRef); - + queryObject = new PreparedQueryObject(); + String insQuery = "BEGIN BATCH" + " UPDATE " + keyspace + "." + lockTable + " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" + " INSERT INTO " + keyspace + "." + lockTable + - "(key, lockReference, createTime, acquireTime, writeLock) VALUES (?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;"; + "(key, lockReference, createTime, acquireTime, lockType, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;"; queryObject.addValue(lockRef); queryObject.addValue(lockName); @@ -181,7 +197,8 @@ public class CassaLockStore { queryObject.addValue(lockRef); queryObject.addValue(String.valueOf(lockEpochMillis)); queryObject.addValue("0"); - queryObject.addValue(locktype==LockType.WRITE ? true : false ); + queryObject.addValue(locktype); + queryObject.addValue(owner); queryObject.appendQueryString(insQuery); boolean pResult = dsHandle.executePut(queryObject, "critical"); if (!pResult) {// couldn't create lock ref, retry @@ -190,14 +207,12 @@ public class CassaLockStore { logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to create lock reference"); throw new MusicLockingException("Unable to create lock reference"); } - return genLockRefandEnQueue(keyspace, table, lockName, locktype, count); + return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, count); } return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef); } - - - /** + /** * Returns a result set containing the list of clients waiting for a particular lock * * @param keyspace @@ -269,14 +284,15 @@ public class CassaLockStore { ResultSet results = dsHandle.executeOneConsistencyGet(queryObject); Row row = results.one(); if (row == null || row.isNull("lockReference")) { - return new LockObject(false, null, null, null, null); + return new LockObject(false, null, null, null, null, null); } String lockReference = "" + row.getLong("lockReference"); String createTime = row.getString("createTime"); String acquireTime = row.getString("acquireTime"); - LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ; + LockType locktype = row.get("lockType", LockType.class); + String owner = row.getString("owner"); - return new LockObject(true, lockReference, createTime, acquireTime, locktype); + return new LockObject(true, lockReference, createTime, acquireTime, locktype, owner); } public List getCurrentLockHolders(String keyspace, String table, String key) @@ -301,7 +317,7 @@ public class CassaLockStore { return lockHolders; } lockReference.append(lock).append(row.getLong("lockReference")); - if (row.isNull("writeLock") || row.getBool("writeLock")) { + if (row.get("lockType", LockType.class)!=LockType.WRITE) { if (topOfQueue) { lockHolders.add(lockReference.toString()); break; @@ -344,7 +360,7 @@ public class CassaLockStore { boolean topOfQueue = true; for (Row row : rs) { String lockReference = "" + row.getLong("lockReference"); - if (row.isNull("writeLock") || row.getBool("writeLock")) { + if (row.get("lockType", LockType.class)==LockType.WRITE) { if (topOfQueue && lockRef.equals(lockReference)) { return true; } else { @@ -392,10 +408,11 @@ public class CassaLockStore { String lockReference = "" + row.getLong("lockReference"); String createTime = row.getString("createTime"); String acquireTime = row.getString("acquireTime"); - LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ; + LockType locktype = row.get("lockType", LockType.class); boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef); + String owner = row.getString("owner"); - return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype); + return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype, owner); } @@ -441,17 +458,147 @@ public class CassaLockStore { } - public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) - throws MusicServiceException, MusicQueryException { + public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) { table = table_prepend_name + table; - PreparedQueryObject queryObject = new PreparedQueryObject(); Long lockReferenceL = Long.parseLong(lockReference); String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis() + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;"; - queryObject.appendQueryString(updateQuery); //cannot use executePut because we need to ignore music timestamp adjustments for lock store dsHandle.getSession().execute(updateQuery); } + public boolean checkForDeadlock(String keyspace, String table, String lockName, LockType locktype, String owner, boolean forAcquire) throws MusicServiceException, MusicQueryException { + if (locktype.equals(LockType.READ)) return false; + if (owner==null || owner.length()==0) return false; + + String lockTable = table_prepend_name + table; + PreparedQueryObject queryObject = new PreparedQueryObject(); + queryObject.appendQueryString("SELECT key, acquiretime, owner FROM " + keyspace + "." + lockTable); + queryObject.appendQueryString(" WHERE lockType = ? ALLOW FILTERING"); + queryObject.addValue(LockType.WRITE); + + DeadlockDetectionUtil ddu = new DeadlockDetectionUtil(); + + ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject); + logger.debug("rs has " + rs.getAvailableWithoutFetching() + (rs.isFullyFetched()?"":" (or more!)") ); + Iterator it = rs.iterator(); + while (it.hasNext()) { + Row row = it.next(); + logger.debug("key = " + row.getString("key") + ", time = " + row.getString("acquiretime") + ", owner = " + row.getString("owner") ); + ddu.setExisting(row.getString("key"), row.getString("owner"), ("0".equals(row.getString("acquiretime")))?OwnershipType.CREATED:OwnershipType.ACQUIRED); + } + boolean deadlock = ddu.checkForDeadlock(lockName, owner, forAcquire?OwnershipType.ACQUIRED:OwnershipType.CREATED); + if (deadlock) logger.warn("Deadlock detected when " + owner + " tried to create lock on " + keyspace + "." + lockTable + "." + lockName); + return deadlock; + } + + public List getAllLocksForOwner(String ownerId, String keyspace, String table) throws MusicServiceException, MusicQueryException { + List toRet = new ArrayList(); + String lockTable = table_prepend_name + table; + PreparedQueryObject queryObject = new PreparedQueryObject(); + queryObject.appendQueryString("SELECT key, lockreference FROM " + keyspace + "." + lockTable); + queryObject.appendQueryString(" WHERE owner = '" + ownerId + "' ALLOW FILTERING"); + + ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject); + Iterator it = rs.iterator(); + while (it.hasNext()) { + Row row = it.next(); + toRet.add(row.getString("key") + "$" + row.getLong("lockreference")); + } + return toRet; + } + + public ReturnType promoteLock(String keyspace, String table, String key, String lockRef) + throws MusicLockingException, MusicServiceException, MusicQueryException { + String lockqtable = table_prepend_name + table; + String selectQuery = "select * from " + keyspace + "." + lockqtable + " where key=?;"; + + PreparedQueryObject queryObject = new PreparedQueryObject(); + queryObject.appendQueryString(selectQuery); + queryObject.addValue(key); + ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject); + + long refToPromote = Long.parseLong(lockRef); + + boolean topOfQueue = true; + boolean readBlock = false; + boolean seenLockToPromote = false; + boolean promotionOngoing = false; + long readBlockStart = 0; + long readBlockEnd = 0; + + + for (Row row : rs) { + long ref = row.getLong("lockreference"); + LockType lockType = row.get("lockType", LockType.class); + + if (refToPromote==ref) { + if (promotionOngoing) { + return new ReturnType(ResultType.FAILURE, "Can't promote, already promoting another lockref."); + } + seenLockToPromote = true; + if (!topOfQueue) { + readBlockStart = ref; + readBlockEnd = ref; + break; + } + } else if (!seenLockToPromote && refToPromote