2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * Modifications Copyright (C) 2019 IBM.
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
20 * ============LICENSE_END=============================================
21 * ====================================================================
24 package org.onap.music.lockingservice.cassandra;
26 import java.util.ArrayList;
27 import java.util.Iterator;
28 import java.util.List;
30 import org.onap.music.datastore.MusicDataStore;
31 import org.onap.music.datastore.PreparedQueryObject;
32 import org.onap.music.eelf.logging.EELFLoggerDelegate;
33 import org.onap.music.exceptions.MusicLockingException;
34 import org.onap.music.exceptions.MusicQueryException;
35 import org.onap.music.exceptions.MusicServiceException;
36 import org.onap.music.main.DeadlockDetectionUtil;
37 import org.onap.music.main.DeadlockDetectionUtil.OwnershipType;
38 import org.onap.music.main.MusicUtil;
39 import org.onap.music.main.ResultType;
40 import org.onap.music.main.ReturnType;
41 import com.datastax.driver.core.ResultSet;
42 import com.datastax.driver.core.Row;
45 * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state.
48 public class CassaLockStore {
50 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
51 public static final String table_prepend_name = "lockQ_";
52 private MusicDataStore dsHandle;
54 public CassaLockStore() {
55 dsHandle = new MusicDataStore();
58 public CassaLockStore(MusicDataStore dsHandle) {
59 this.dsHandle=dsHandle;
61 public class LockObject{
62 private boolean isLockOwner;
63 private String lockRef;
64 private String createTime;
65 private String acquireTime;
66 private LockType locktype;
67 // Owner is the self-declared client which "owns" this row. It is used for deadlock detection. It is not (directly) related to isLockOwner.
69 public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype, String owner) {
70 this.setIsLockOwner(isLockOwner);
71 this.setLockRef(lockRef);
72 this.setAcquireTime(acquireTime);
73 this.setCreateTime(createTime);
74 this.setLocktype(locktype);
77 public boolean getIsLockOwner() {
80 public void setIsLockOwner(boolean isLockOwner) {
81 this.isLockOwner = isLockOwner;
83 public String getAcquireTime() {
86 public void setAcquireTime(String acquireTime) {
87 this.acquireTime = acquireTime;
89 public String getCreateTime() {
92 public void setCreateTime(String createTime) {
93 this.createTime = createTime;
95 public String getLockRef() {
98 public void setLockRef(String lockRef) {
99 this.lockRef = lockRef;
101 public LockType getLocktype() {
104 public void setLocktype(LockType locktype) {
105 this.locktype = locktype;
107 public String getOwner() {
110 public void setOwner(String owner) {
117 * This method creates a shadow locking table for every main table in Cassandra. This table tracks all information regarding locks.
118 * @param keyspace of the application.
119 * @param table of the application.
120 * @return true if the operation was successful.
121 * @throws MusicServiceException
122 * @throws MusicQueryException
124 public boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
125 logger.info(EELFLoggerDelegate.applicationLogger,
126 "Create lock queue/table for " + keyspace+"."+table);
127 table = table_prepend_name+table;
128 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
129 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, "
130 + "lockType text, leasePeriodTime bigint, owner text, PRIMARY KEY ((key), lockReference) ) "
131 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
132 PreparedQueryObject queryObject = new PreparedQueryObject();
134 queryObject.appendQueryString(tabQuery);
136 result = dsHandle.executePut(queryObject, "eventual");
141 * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps.
142 * @param keyspace of the locks.
143 * @param table of the locks.
144 * @param lockName is the primary key of the lock table
145 * @param lockType is the type of lock (read/write)
146 * @param owner is the owner of the lock (optional, for deadlock detection)
147 * @return the UUID lock reference.
148 * @throws MusicServiceException
149 * @throws MusicQueryException
151 public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner) throws MusicServiceException, MusicQueryException, MusicLockingException {
152 return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, 0);
155 private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
156 logger.info(EELFLoggerDelegate.applicationLogger,
157 "Create " + locktype + " lock reference for " + keyspace + "." + table + "." + lockName);
158 String lockTable ="";
159 lockTable = table_prepend_name+table;
161 PreparedQueryObject queryObject = new PreparedQueryObject();
162 String selectQuery = "SELECT guard FROM " + keyspace + "." + lockTable + " WHERE key=?;";
164 queryObject.addValue(lockName);
165 queryObject.appendQueryString(selectQuery);
166 ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject);
167 List<Row> latestGuardRow = gqResult.all();
171 if (!latestGuardRow.isEmpty()) {
172 prevGuard = latestGuardRow.get(0).getLong(0);
173 lockRef = prevGuard + 1;
176 long lockEpochMillis = System.currentTimeMillis();
178 logger.info(EELFLoggerDelegate.applicationLogger,
179 "Created lock reference for " + keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
181 queryObject = new PreparedQueryObject();
183 String insQuery = "BEGIN BATCH" +
184 " UPDATE " + keyspace + "." + lockTable +
185 " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
186 " INSERT INTO " + keyspace + "." + lockTable +
187 "(key, lockReference, createTime, acquireTime, lockType, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
189 queryObject.addValue(lockRef);
190 queryObject.addValue(lockName);
192 queryObject.addValue(prevGuard);
194 queryObject.addValue(lockName);
195 queryObject.addValue(lockRef);
196 queryObject.addValue(String.valueOf(lockEpochMillis));
197 queryObject.addValue("0");
198 queryObject.addValue(locktype);
199 queryObject.addValue(owner);
200 queryObject.appendQueryString(insQuery);
201 boolean pResult = dsHandle.executePut(queryObject, "critical");
202 if (!pResult) {// couldn't create lock ref, retry
204 if (count > MusicUtil.getRetryCount()) {
205 logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to create lock reference");
206 throw new MusicLockingException("Unable to create lock reference");
208 return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, count);
210 return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef);
214 * Returns a result set containing the list of clients waiting for a particular lock
219 * @return list of lockrefs in the queue
220 * @throws MusicServiceException
221 * @throws MusicQueryException
223 public List<String> getLockQueue(String keyspace, String table, String key)
224 throws MusicServiceException, MusicQueryException {
225 logger.info(EELFLoggerDelegate.applicationLogger,
226 "Getting the queue for " + keyspace + "." + table + "." + key);
227 table = table_prepend_name + table;
228 String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "';";
229 PreparedQueryObject queryObject = new PreparedQueryObject();
230 queryObject.appendQueryString(selectQuery);
231 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
232 ArrayList<String> lockQueue = new ArrayList<>();
234 lockQueue.add(Long.toString(row.getLong("lockReference")));
241 * Returns a result set containing the list of clients waiting for a particular lock
246 * @return size of lockrefs queue
247 * @throws MusicServiceException
248 * @throws MusicQueryException
250 public long getLockQueueSize(String keyspace, String table, String key)
251 throws MusicServiceException, MusicQueryException {
252 logger.info(EELFLoggerDelegate.applicationLogger,
253 "Getting the queue size for " + keyspace + "." + table + "." + key);
254 table = table_prepend_name + table;
255 String selectQuery = "select count(*) from " + keyspace + "." + table + " where key='" + key + "';";
256 PreparedQueryObject queryObject = new PreparedQueryObject();
257 queryObject.appendQueryString(selectQuery);
258 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
259 return rs.one().getLong("count");
264 * This method returns the top of lock table/queue for the key.
266 * @param keyspace of the application.
267 * @param table of the application.
268 * @param key is the primary key of the application table
269 * @return the UUID lock reference. Returns LockObject.isLockOwner=false if there is no owner or the
271 * @throws MusicServiceException
272 * @throws MusicQueryException
274 public LockObject peekLockQueue(String keyspace, String table, String key)
275 throws MusicServiceException, MusicQueryException {
276 logger.info(EELFLoggerDelegate.applicationLogger,
277 "Peek in lock table for " + keyspace + "." + table + "." + key);
278 table = table_prepend_name + table;
279 String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "' LIMIT 1;";
280 PreparedQueryObject queryObject = new PreparedQueryObject();
281 queryObject.appendQueryString(selectQuery);
282 ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
283 Row row = results.one();
284 if (row == null || row.isNull("lockReference")) {
285 return new LockObject(false, null, null, null, null, null);
287 String lockReference = "" + row.getLong("lockReference");
288 String createTime = row.getString("createTime");
289 String acquireTime = row.getString("acquireTime");
290 LockType locktype = row.get("lockType", LockType.class);
291 String owner = row.getString("owner");
293 return new LockObject(true, lockReference, createTime, acquireTime, locktype, owner);
296 public List<String> getCurrentLockHolders(String keyspace, String table, String key)
297 throws MusicServiceException, MusicQueryException {
298 logger.info(EELFLoggerDelegate.applicationLogger,
299 "Getting lockholders in lock table for " + keyspace + "." + table + "." + key);
300 String origTable = table;
301 table = table_prepend_name + table;
302 String selectQuery = "select * from " + keyspace + "." + table + " where key=?;";
303 List<String> lockHolders = new ArrayList<>();
304 PreparedQueryObject queryObject = new PreparedQueryObject();
305 queryObject.appendQueryString(selectQuery);
306 queryObject.addValue(key);
307 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
308 boolean topOfQueue = true;
309 StringBuilder lock = new StringBuilder().
310 append("$").append(keyspace).append(".").append(origTable).
311 append(".").append(key).append("$");
312 StringBuilder lockReference = new StringBuilder();
314 if ( row.isNull("lockReference") ) {
317 lockReference.append(lock).append(row.getLong("lockReference"));
318 if (row.get("lockType", LockType.class)!=LockType.WRITE) {
320 lockHolders.add(lockReference.toString());
327 lockHolders.add(lockReference.toString());
330 lockReference.delete(0,lockReference.length());
336 * Determine if the lock is a valid current lock holder.
342 * @return true if lockRef is a lock owner of key
343 * @throws MusicServiceException
344 * @throws MusicQueryException
346 public boolean isLockOwner(String keyspace, String table, String key, String lockRef)
347 throws MusicServiceException, MusicQueryException {
348 logger.info(EELFLoggerDelegate.applicationLogger,
349 "Checking in lock table for " + keyspace + "." + table + "." + key);
350 table = table_prepend_name + table;
352 "select * from " + keyspace + "." + table + " where key=?;";
353 PreparedQueryObject queryObject = new PreparedQueryObject();
354 queryObject.appendQueryString(selectQuery);
355 queryObject.addValue(key);
356 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
358 boolean topOfQueue = true;
360 String lockReference = "" + row.getLong("lockReference");
361 if (row.get("lockType", LockType.class)==LockType.WRITE) {
362 if (topOfQueue && lockRef.equals(lockReference)) {
368 if (lockRef.equals(lockReference)) {
373 logger.info(EELFLoggerDelegate.applicationLogger, "Could not find " + lockRef
374 + " in the lock queue. It has expired and no longer exists.");
379 * Determine if the lock is a valid current lock holder.
385 * @return true if lockRef is a lock owner of key
386 * @throws MusicServiceException
387 * @throws MusicQueryException
389 public LockObject getLockInfo(String keyspace, String table, String key, String lockRef)
390 throws MusicServiceException, MusicQueryException {
391 logger.info(EELFLoggerDelegate.applicationLogger,
392 "Checking in lock table for " + keyspace + "." + table + "." + key);
393 String lockQ_table = table_prepend_name + table;
395 "select * from " + keyspace + "." + lockQ_table + " where key=? and lockReference=?;";
396 PreparedQueryObject queryObject = new PreparedQueryObject();
397 queryObject.appendQueryString(selectQuery);
398 queryObject.addValue(key);
399 queryObject.addValue(Long.parseLong(lockRef));
400 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
402 if (row == null || row.isNull("lockReference")) {
406 String lockReference = "" + row.getLong("lockReference");
407 String createTime = row.getString("createTime");
408 String acquireTime = row.getString("acquireTime");
409 LockType locktype = row.get("lockType", LockType.class);
410 boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef);
411 String owner = row.getString("owner");
413 return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype, owner);
419 * This method removes the lock ref from the lock table/queue for the key.
421 * @param keyspace of the application.
422 * @param table of the application.
423 * @param key is the primary key of the application table
424 * @param lockReference the lock reference that needs to be dequeued.
425 * @throws MusicServiceException
426 * @throws MusicQueryException
427 * @throws MusicLockingException
429 public void deQueueLockRef(String keyspace, String table, String key, String lockReference, int n)
430 throws MusicServiceException, MusicQueryException, MusicLockingException {
431 String prependTable = table_prepend_name + table;
432 PreparedQueryObject queryObject = new PreparedQueryObject();
433 Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf("$") + 1));
434 String deleteQuery = "delete from " + keyspace + "." + prependTable + " where key='" + key
435 + "' AND lockReference =" + lockReferenceL + " IF EXISTS;";
436 queryObject.appendQueryString(deleteQuery);
437 logger.info(EELFLoggerDelegate.applicationLogger, "Removing lock for key: "+key+ " and reference: "+lockReference);
439 dsHandle.executePut(queryObject, "critical");
440 logger.info(EELFLoggerDelegate.applicationLogger,
441 "Lock removed for key: " + key + " and reference: " + lockReference);
442 } catch (MusicServiceException ex) {
443 logger.error(logger, ex.getMessage(), ex);
444 logger.error(EELFLoggerDelegate.applicationLogger,
445 "Exception while deQueueLockRef for lockname: " + key + " reference:" + lockReference);
447 logger.info(EELFLoggerDelegate.applicationLogger, "Trying again...");
448 deQueueLockRef(keyspace, table, key, lockReference, n - 1);
450 logger.error(EELFLoggerDelegate.applicationLogger,
451 "deQueueLockRef failed for lockname: " + key + " reference:" + lockReference);
452 logger.error(logger, ex.getMessage(), ex);
453 throw new MusicLockingException("Error while deQueueLockRef: " + ex.getMessage());
459 public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) {
460 table = table_prepend_name + table;
461 Long lockReferenceL = Long.parseLong(lockReference);
462 String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis()
463 + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;";
465 //cannot use executePut because we need to ignore music timestamp adjustments for lock store
466 dsHandle.getSession().execute(updateQuery);
469 public boolean checkForDeadlock(String keyspace, String table, String lockName, LockType locktype, String owner, boolean forAcquire) throws MusicServiceException, MusicQueryException {
470 if (locktype.equals(LockType.READ))
472 if (owner==null || owner.length()==0)
475 String lockTable = table_prepend_name + table;
476 PreparedQueryObject queryObject = new PreparedQueryObject();
477 queryObject.appendQueryString("SELECT key, acquiretime, owner FROM " + keyspace + "." + lockTable);
478 queryObject.appendQueryString(" WHERE lockType = ? ALLOW FILTERING");
479 queryObject.addValue(LockType.WRITE);
481 DeadlockDetectionUtil ddu = getDeadlockDetectionUtil();
483 ResultSet rs = dsHandle.executeLocalQuorumConsistencyGet(queryObject);
484 logger.debug("rs has " + rs.getAvailableWithoutFetching() + (rs.isFullyFetched()?"":" (or more!)") );
485 Iterator<Row> it = rs.iterator();
486 while (it.hasNext()) {
488 logger.debug("key = " + row.getString("key") + ", time = " + row.getString("acquiretime") + ", owner = " + row.getString("owner") );
489 ddu.setExisting(row.getString("key"), row.getString("owner"), ("0".equals(row.getString("acquiretime")))?OwnershipType.CREATED:OwnershipType.ACQUIRED);
491 boolean deadlock = ddu.checkForDeadlock(lockName, owner, forAcquire?OwnershipType.ACQUIRED:OwnershipType.CREATED);
493 logger.warn("Deadlock detected when " + owner + " tried to create lock on " + keyspace + "." + lockTable + "." + lockName);
498 * This is used for testing purpose
499 * @return new DeadlockDetectionUtil object
501 DeadlockDetectionUtil getDeadlockDetectionUtil() {
502 return new DeadlockDetectionUtil();
505 public List<String> getAllLocksForOwner(String ownerId, String keyspace, String table) throws MusicServiceException, MusicQueryException {
506 List<String> toRet = new ArrayList<String>();
507 String lockTable = table_prepend_name + table;
508 PreparedQueryObject queryObject = new PreparedQueryObject();
509 queryObject.appendQueryString("SELECT key, lockreference FROM " + keyspace + "." + lockTable);
510 queryObject.appendQueryString(" WHERE owner = '" + ownerId + "' ALLOW FILTERING");
512 ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject);
513 Iterator<Row> it = rs.iterator();
514 while (it.hasNext()) {
516 toRet.add(row.getString("key") + "$" + row.getLong("lockreference"));
521 public ReturnType promoteLock(String keyspace, String table, String key, String lockRef)
522 throws MusicServiceException, MusicQueryException {
523 String lockqtable = table_prepend_name + table;
524 String selectQuery = "select * from " + keyspace + "." + lockqtable + " where key=?;";
526 PreparedQueryObject queryObject = new PreparedQueryObject();
527 queryObject.appendQueryString(selectQuery);
528 queryObject.addValue(key);
529 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
531 long refToPromote = Long.parseLong(lockRef);
533 boolean topOfQueue = true;
534 boolean readBlock = false;
535 boolean seenLockToPromote = false;
536 boolean promotionOngoing = false;
537 long readBlockStart = 0;
538 long readBlockEnd = 0;
542 long ref = row.getLong("lockreference");
543 LockType lockType = row.get("lockType", LockType.class);
545 if (refToPromote==ref) {
546 if (promotionOngoing) {
547 return new ReturnType(ResultType.FAILURE, "Can't promote, already promoting another lockref.");
549 seenLockToPromote = true;
551 readBlockStart = ref;
555 } else if (!seenLockToPromote && refToPromote<ref) {
556 return new ReturnType(ResultType.FAILURE, "Lockref does not exist.");
559 if (lockType==LockType.READ || lockType==LockType.PROMOTING) {
561 readBlockStart = ref;
567 if (lockType==LockType.PROMOTING) {
568 promotionOngoing = true;
572 if (lockType==LockType.WRITE) {
573 if (refToPromote==ref) {
574 return new ReturnType(ResultType.FAILURE, "Lockref is already write.");
578 promotionOngoing = false;
579 if (seenLockToPromote) {
582 //can no longer be lock holder after this
588 if (readBlockStart<=refToPromote && refToPromote<=readBlockEnd) {
589 if (readBlockStart==refToPromote && refToPromote==readBlockEnd) {
590 promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.WRITE);
591 return new ReturnType(ResultType.SUCCESS, "Lock has successfully been upgraded.");
593 promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.PROMOTING);
594 return new ReturnType(ResultType.FAILURE, "Your lock upgrade is in progress. Check again to see if successful.");
597 //shouldn't reach here?
598 return new ReturnType(ResultType.FAILURE,"Promotion failed.");
601 private void promoteLockTo(String keyspace, String table, String key, String lockRef, LockType newLockType)
602 throws MusicServiceException, MusicQueryException {
603 PreparedQueryObject queryObject =
604 new PreparedQueryObject("UPDATE " + keyspace + "." + table + " SET lockType=? WHERE key='" + key
605 + "' AND lockReference = " + lockRef + " IF EXISTS;", newLockType);
607 //cannot use executePut because we need to ignore music timestamp adjustments for lock store
608 dsHandle.executePut(queryObject, MusicUtil.QUORUM);