package org.onap.music.lockingservice.cassandra;
import java.util.ArrayList;
+ import java.util.Iterator;
import java.util.List;
import org.onap.music.datastore.MusicDataStore;
import org.onap.music.exceptions.MusicLockingException;
import org.onap.music.exceptions.MusicQueryException;
import org.onap.music.exceptions.MusicServiceException;
+ import org.onap.music.main.DeadlockDetectionUtil;
+ import org.onap.music.main.DeadlockDetectionUtil.OwnershipType;
import org.onap.music.main.MusicUtil;
import com.datastax.driver.core.ResultSet;
private String createTime;
private String acquireTime;
private LockType locktype;
- public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype) {
+ // Owner is the self-declared client which "owns" this row. It is used for deadlock detection. It is not (directly) related to isLockOwner.
+ private String owner;
+ public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype, String owner) {
this.setIsLockOwner(isLockOwner);
this.setLockRef(lockRef);
this.setAcquireTime(acquireTime);
this.setCreateTime(createTime);
this.setLocktype(locktype);
+ this.setOwner(owner);
}
public boolean getIsLockOwner() {
return isLockOwner;
public void setLocktype(LockType locktype) {
this.locktype = locktype;
}
+ public String getOwner() {
+ return owner;
+ }
+ public void setOwner(String owner) {
+ this.owner = owner;
+ }
}
/**
table = table_prepend_name+table;
String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
+ " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, "
- + "writeLock boolean, PRIMARY KEY ((key), lockReference) ) "
+ + "writeLock boolean, owner text, PRIMARY KEY ((key), lockReference) ) "
+ "WITH CLUSTERING ORDER BY (lockReference ASC);";
PreparedQueryObject queryObject = new PreparedQueryObject();
* @param keyspace of the locks.
* @param table of the locks.
* @param lockName is the primary key of the lock table
+ * @param lockType is the type of lock (read/write)
+ * @param owner is the owner of the lock (optional, for deadlock detection)
* @return the UUID lock reference.
* @throws MusicServiceException
* @throws MusicQueryException
*/
- public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype) throws MusicServiceException, MusicQueryException, MusicLockingException {
- return genLockRefandEnQueue(keyspace, table, lockName, locktype, 0);
+ public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner) throws MusicServiceException, MusicQueryException, MusicLockingException {
+ return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, 0);
}
- private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
+ private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
logger.info(EELFLoggerDelegate.applicationLogger,
"Create " + locktype + " lock reference for " + keyspace + "." + table + "." + lockName);
String lockTable ="";
lockTable = table_prepend_name+table;
-
-
PreparedQueryObject queryObject = new PreparedQueryObject();
String selectQuery = "SELECT guard FROM " + keyspace + "." + lockTable + " WHERE key=?;";
" UPDATE " + keyspace + "." + lockTable +
" SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
" INSERT INTO " + keyspace + "." + lockTable +
- "(key, lockReference, createTime, acquireTime, writeLock) VALUES (?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
+ "(key, lockReference, createTime, acquireTime, writeLock, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
queryObject.addValue(lockRef);
queryObject.addValue(lockName);
queryObject.addValue(String.valueOf(lockEpochMillis));
queryObject.addValue("0");
queryObject.addValue(locktype==LockType.WRITE ? true : false );
+ queryObject.addValue(owner);
queryObject.appendQueryString(insQuery);
boolean pResult = dsHandle.executePut(queryObject, "critical");
if (!pResult) {// couldn't create lock ref, retry
logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to create lock reference");
throw new MusicLockingException("Unable to create lock reference");
}
- return genLockRefandEnQueue(keyspace, table, lockName, locktype, count);
+ return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, count);
}
return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef);
}
-
-
- /**
+ /**
* Returns a result set containing the list of clients waiting for a particular lock
*
* @param keyspace
ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
Row row = results.one();
if (row == null || row.isNull("lockReference")) {
- return new LockObject(false, null, null, null, null);
+ return new LockObject(false, null, null, null, null, null);
}
String lockReference = "" + row.getLong("lockReference");
String createTime = row.getString("createTime");
String acquireTime = row.getString("acquireTime");
LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
+ String owner = row.getString("owner");
- return new LockObject(true, lockReference, createTime, acquireTime, locktype);
+ return new LockObject(true, lockReference, createTime, acquireTime, locktype, owner);
}
public List<String> getCurrentLockHolders(String keyspace, String table, String key)
String acquireTime = row.getString("acquireTime");
LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef);
+ String owner = row.getString("owner");
- return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype);
+ return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype, owner);
}
}
- public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference)
- throws MusicServiceException, MusicQueryException {
+ public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) {
table = table_prepend_name + table;
PreparedQueryObject queryObject = new PreparedQueryObject();
Long lockReferenceL = Long.parseLong(lockReference);
dsHandle.getSession().execute(updateQuery);
}
+ public boolean checkForDeadlock(String keyspace, String table, String lockName, LockType locktype, String owner, boolean forAcquire) throws MusicServiceException, MusicQueryException {
+ if (locktype.equals(LockType.READ)) return false;
+ if (owner==null || owner.length()==0) return false;
+
+ String lockTable = table_prepend_name + table;
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ queryObject.appendQueryString("SELECT key, acquiretime, owner FROM " + keyspace + "." + lockTable);
+ queryObject.appendQueryString(" WHERE writelock = True ALLOW FILTERING");
+
+ DeadlockDetectionUtil ddu = new DeadlockDetectionUtil();
+
+ ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject);
+ logger.debug("rs has " + rs.getAvailableWithoutFetching() + (rs.isFullyFetched()?"":" (or more!)") );
+ Iterator<Row> it = rs.iterator();
+ while (it.hasNext()) {
+ Row row = it.next();
+ logger.debug("key = " + row.getString("key") + ", time = " + row.getString("acquiretime") + ", owner = " + row.getString("owner") );
+ ddu.setExisting(row.getString("key"), row.getString("owner"), ("0".equals(row.getString("acquiretime")))?OwnershipType.CREATED:OwnershipType.ACQUIRED);
+ }
+ boolean deadlock = ddu.checkForDeadlock(lockName, owner, forAcquire?OwnershipType.ACQUIRED:OwnershipType.CREATED);
+ if (deadlock) logger.warn("Deadlock detected when " + owner + " tried to create lock on " + keyspace + "." + lockTable + "." + lockName);
+ return deadlock;
+ }
+
+ public List<String> getAllLocksForOwner(String ownerId, String keyspace, String table) throws MusicServiceException, MusicQueryException {
+ List<String> toRet = new ArrayList<String>();
+ String lockTable = table_prepend_name + table;
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ queryObject.appendQueryString("SELECT key, lockreference FROM " + keyspace + "." + lockTable);
+ queryObject.appendQueryString(" WHERE owner = '" + ownerId + "' ALLOW FILTERING");
+
+ ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject);
+ Iterator<Row> it = rs.iterator();
+ while (it.hasNext()) {
+ Row row = it.next();
+ toRet.add(row.getString("key") + "$" + row.getLong("lockreference"));
+ }
+ return toRet;
+ }
+
+
}
@ApiParam(value = "Authorization", required = true) @HeaderParam(MusicUtil.AUTHORIZATION) String authorization,
@ApiParam(value = "AID", required = false, hidden = true) @HeaderParam("aid") String aid,
JsonLock lockObject,
+ @ApiParam(value = "Lock Owner", required = false) @HeaderParam("owner") String owner,
@ApiParam(value = "Application namespace",
required = false, hidden = true) @HeaderParam("ns") String ns) throws Exception{
try {
}
String lockId;
try {
- lockId= MusicCore.createLockReference(lockName, locktype);
+ lockId= MusicCore.createLockReference(lockName, locktype, owner);
} catch (MusicLockingException e) {
return response.status(Status.BAD_REQUEST).entity(new JsonResponse(ResultType.FAILURE).setError(e.getMessage()).toMap()).build();
}
@ApiParam(value = "Authorization", required = true) @HeaderParam(MusicUtil.AUTHORIZATION) String authorization,
@ApiParam(value = "AID", required = false, hidden = true) @HeaderParam("aid") String aid,
@ApiParam(value = "Application namespace",
- required = false, hidden = true) @HeaderParam("ns") String ns) throws Exception{
+ required = false, hidden = true) @HeaderParam("ns") String ns) {
try {
ResponseBuilder response = MusicUtil.buildVersionResponse(VERSION, minorVersion, patchVersion);
Map<String, Object> resultMap = MusicCore.validateLock(lockName);
@ApiParam(value = "Authorization", required = true) @HeaderParam(MusicUtil.AUTHORIZATION) String authorization,
@ApiParam(value = "AID", required = false, hidden = true) @HeaderParam("aid") String aid,
@ApiParam(value = "Application namespace",
- required = false, hidden = true) @HeaderParam("ns") String ns) throws Exception{
+ required = false, hidden = true) @HeaderParam("ns") String ns) {
try {
ResponseBuilder response = MusicUtil.buildVersionResponse(VERSION, minorVersion, patchVersion);
Map<String, Object> resultMap = MusicCore.validateLock(lockName);