2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * Modifications Copyright (C) 2019 IBM.
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
20 * ============LICENSE_END=============================================
21 * ====================================================================
24 package org.onap.music.lockingservice.cassandra;
26 import java.util.ArrayList;
27 import java.util.Iterator;
28 import java.util.List;
30 import org.onap.music.datastore.MusicDataStore;
31 import org.onap.music.datastore.PreparedQueryObject;
32 import org.onap.music.eelf.logging.EELFLoggerDelegate;
33 import org.onap.music.exceptions.MusicLockingException;
34 import org.onap.music.exceptions.MusicQueryException;
35 import org.onap.music.exceptions.MusicServiceException;
36 import org.onap.music.main.DeadlockDetectionUtil;
37 import org.onap.music.main.DeadlockDetectionUtil.OwnershipType;
38 import org.onap.music.main.MusicUtil;
39 import org.onap.music.main.ResultType;
40 import org.onap.music.main.ReturnType;
41 import com.datastax.driver.core.ResultSet;
42 import com.datastax.driver.core.Row;
43 import com.datastax.driver.core.Session;
44 import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
47 * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state.
50 public class CassaLockStore {
52 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
53 private static String table_prepend_name = "lockQ_";
54 private MusicDataStore dsHandle;
56 public CassaLockStore() {
57 dsHandle = new MusicDataStore();
60 public CassaLockStore(MusicDataStore dsHandle) {
61 this.dsHandle=dsHandle;
63 public class LockObject{
64 private boolean isLockOwner;
65 private String lockRef;
66 private String createTime;
67 private String acquireTime;
68 private LockType locktype;
69 // Owner is the self-declared client which "owns" this row. It is used for deadlock detection. It is not (directly) related to isLockOwner.
71 public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype, String owner) {
72 this.setIsLockOwner(isLockOwner);
73 this.setLockRef(lockRef);
74 this.setAcquireTime(acquireTime);
75 this.setCreateTime(createTime);
76 this.setLocktype(locktype);
79 public boolean getIsLockOwner() {
82 public void setIsLockOwner(boolean isLockOwner) {
83 this.isLockOwner = isLockOwner;
85 public String getAcquireTime() {
88 public void setAcquireTime(String acquireTime) {
89 this.acquireTime = acquireTime;
91 public String getCreateTime() {
94 public void setCreateTime(String createTime) {
95 this.createTime = createTime;
97 public String getLockRef() {
100 public void setLockRef(String lockRef) {
101 this.lockRef = lockRef;
103 public LockType getLocktype() {
106 public void setLocktype(LockType locktype) {
107 this.locktype = locktype;
109 public String getOwner() {
112 public void setOwner(String owner) {
119 * This method creates a shadow locking table for every main table in Cassandra. This table tracks all information regarding locks.
120 * @param keyspace of the application.
121 * @param table of the application.
122 * @return true if the operation was successful.
123 * @throws MusicServiceException
124 * @throws MusicQueryException
126 public boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
127 logger.info(EELFLoggerDelegate.applicationLogger,
128 "Create lock queue/table for " + keyspace+"."+table);
129 table = table_prepend_name+table;
130 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
131 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, "
132 + "lockType text, leasePeriodTime bigint, owner text, PRIMARY KEY ((key), lockReference) ) "
133 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
134 PreparedQueryObject queryObject = new PreparedQueryObject();
136 queryObject.appendQueryString(tabQuery);
138 result = dsHandle.executePut(queryObject, "eventual");
143 * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps.
144 * @param keyspace of the locks.
145 * @param table of the locks.
146 * @param lockName is the primary key of the lock table
147 * @param lockType is the type of lock (read/write)
148 * @param owner is the owner of the lock (optional, for deadlock detection)
149 * @return the UUID lock reference.
150 * @throws MusicServiceException
151 * @throws MusicQueryException
153 public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner) throws MusicServiceException, MusicQueryException, MusicLockingException {
154 return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, 0);
157 private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
158 logger.info(EELFLoggerDelegate.applicationLogger,
159 "Create " + locktype + " lock reference for " + keyspace + "." + table + "." + lockName);
160 String lockTable ="";
161 lockTable = table_prepend_name+table;
163 PreparedQueryObject queryObject = new PreparedQueryObject();
164 String selectQuery = "SELECT guard FROM " + keyspace + "." + lockTable + " WHERE key=?;";
166 queryObject.addValue(lockName);
167 queryObject.appendQueryString(selectQuery);
168 ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject);
169 List<Row> latestGuardRow = gqResult.all();
173 if (!latestGuardRow.isEmpty()) {
174 prevGuard = latestGuardRow.get(0).getLong(0);
175 lockRef = prevGuard + 1;
178 long lockEpochMillis = System.currentTimeMillis();
180 logger.info(EELFLoggerDelegate.applicationLogger,
181 "Created lock reference for " + keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
183 queryObject = new PreparedQueryObject();
185 String insQuery = "BEGIN BATCH" +
186 " UPDATE " + keyspace + "." + lockTable +
187 " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
188 " INSERT INTO " + keyspace + "." + lockTable +
189 "(key, lockReference, createTime, acquireTime, lockType, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
191 queryObject.addValue(lockRef);
192 queryObject.addValue(lockName);
194 queryObject.addValue(prevGuard);
196 queryObject.addValue(lockName);
197 queryObject.addValue(lockRef);
198 queryObject.addValue(String.valueOf(lockEpochMillis));
199 queryObject.addValue("0");
200 queryObject.addValue(locktype);
201 queryObject.addValue(owner);
202 queryObject.appendQueryString(insQuery);
203 boolean pResult = dsHandle.executePut(queryObject, "critical");
204 if (!pResult) {// couldn't create lock ref, retry
206 if (count > MusicUtil.getRetryCount()) {
207 logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to create lock reference");
208 throw new MusicLockingException("Unable to create lock reference");
210 return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, count);
212 return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef);
216 * Returns a result set containing the list of clients waiting for a particular lock
221 * @return list of lockrefs in the queue
222 * @throws MusicServiceException
223 * @throws MusicQueryException
225 public List<String> getLockQueue(String keyspace, String table, String key)
226 throws MusicServiceException, MusicQueryException {
227 logger.info(EELFLoggerDelegate.applicationLogger,
228 "Getting the queue for " + keyspace + "." + table + "." + key);
229 table = table_prepend_name + table;
230 String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "';";
231 PreparedQueryObject queryObject = new PreparedQueryObject();
232 queryObject.appendQueryString(selectQuery);
233 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
234 ArrayList<String> lockQueue = new ArrayList<>();
236 lockQueue.add(Long.toString(row.getLong("lockReference")));
243 * Returns a result set containing the list of clients waiting for a particular lock
248 * @return size of lockrefs queue
249 * @throws MusicServiceException
250 * @throws MusicQueryException
252 public long getLockQueueSize(String keyspace, String table, String key)
253 throws MusicServiceException, MusicQueryException {
254 logger.info(EELFLoggerDelegate.applicationLogger,
255 "Getting the queue size for " + keyspace + "." + table + "." + key);
256 table = table_prepend_name + table;
257 String selectQuery = "select count(*) from " + keyspace + "." + table + " where key='" + key + "';";
258 PreparedQueryObject queryObject = new PreparedQueryObject();
259 queryObject.appendQueryString(selectQuery);
260 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
261 return rs.one().getLong("count");
266 * This method returns the top of lock table/queue for the key.
268 * @param keyspace of the application.
269 * @param table of the application.
270 * @param key is the primary key of the application table
271 * @return the UUID lock reference. Returns LockObject.isLockOwner=false if there is no owner or the
273 * @throws MusicServiceException
274 * @throws MusicQueryException
276 public LockObject peekLockQueue(String keyspace, String table, String key)
277 throws MusicServiceException, MusicQueryException {
278 logger.info(EELFLoggerDelegate.applicationLogger,
279 "Peek in lock table for " + keyspace + "." + table + "." + key);
280 table = table_prepend_name + table;
281 String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "' LIMIT 1;";
282 PreparedQueryObject queryObject = new PreparedQueryObject();
283 queryObject.appendQueryString(selectQuery);
284 ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
285 Row row = results.one();
286 if (row == null || row.isNull("lockReference")) {
287 return new LockObject(false, null, null, null, null, null);
289 String lockReference = "" + row.getLong("lockReference");
290 String createTime = row.getString("createTime");
291 String acquireTime = row.getString("acquireTime");
292 LockType locktype = row.get("lockType", LockType.class);
293 String owner = row.getString("owner");
295 return new LockObject(true, lockReference, createTime, acquireTime, locktype, owner);
298 public List<String> getCurrentLockHolders(String keyspace, String table, String key)
299 throws MusicServiceException, MusicQueryException {
300 logger.info(EELFLoggerDelegate.applicationLogger,
301 "Getting lockholders in lock table for " + keyspace + "." + table + "." + key);
302 String origTable = table;
303 table = table_prepend_name + table;
304 String selectQuery = "select * from " + keyspace + "." + table + " where key=?;";
305 List<String> lockHolders = new ArrayList<>();
306 PreparedQueryObject queryObject = new PreparedQueryObject();
307 queryObject.appendQueryString(selectQuery);
308 queryObject.addValue(key);
309 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
310 boolean topOfQueue = true;
311 StringBuilder lock = new StringBuilder().
312 append("$").append(keyspace).append(".").append(origTable).
313 append(".").append(key).append("$");
314 StringBuilder lockReference = new StringBuilder();
316 if ( row.isNull("lockReference") ) {
319 lockReference.append(lock).append(row.getLong("lockReference"));
320 if (row.get("lockType", LockType.class)!=LockType.WRITE) {
322 lockHolders.add(lockReference.toString());
329 lockHolders.add(lockReference.toString());
332 lockReference.delete(0,lockReference.length());
338 * Determine if the lock is a valid current lock holder.
344 * @return true if lockRef is a lock owner of key
345 * @throws MusicServiceException
346 * @throws MusicQueryException
348 public boolean isLockOwner(String keyspace, String table, String key, String lockRef)
349 throws MusicServiceException, MusicQueryException {
350 logger.info(EELFLoggerDelegate.applicationLogger,
351 "Checking in lock table for " + keyspace + "." + table + "." + key);
352 table = table_prepend_name + table;
354 "select * from " + keyspace + "." + table + " where key=?;";
355 PreparedQueryObject queryObject = new PreparedQueryObject();
356 queryObject.appendQueryString(selectQuery);
357 queryObject.addValue(key);
358 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
360 boolean topOfQueue = true;
362 String lockReference = "" + row.getLong("lockReference");
363 if (row.get("lockType", LockType.class)==LockType.WRITE) {
364 if (topOfQueue && lockRef.equals(lockReference)) {
370 if (lockRef.equals(lockReference)) {
375 logger.info(EELFLoggerDelegate.applicationLogger, "Could not find " + lockRef
376 + " in the lock queue. It has expired and no longer exists.");
381 * Determine if the lock is a valid current lock holder.
387 * @return true if lockRef is a lock owner of key
388 * @throws MusicServiceException
389 * @throws MusicQueryException
391 public LockObject getLockInfo(String keyspace, String table, String key, String lockRef)
392 throws MusicServiceException, MusicQueryException {
393 logger.info(EELFLoggerDelegate.applicationLogger,
394 "Checking in lock table for " + keyspace + "." + table + "." + key);
395 String lockQ_table = table_prepend_name + table;
397 "select * from " + keyspace + "." + lockQ_table + " where key=? and lockReference=?;";
398 PreparedQueryObject queryObject = new PreparedQueryObject();
399 queryObject.appendQueryString(selectQuery);
400 queryObject.addValue(key);
401 queryObject.addValue(Long.parseLong(lockRef));
402 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
404 if (row == null || row.isNull("lockReference")) {
408 String lockReference = "" + row.getLong("lockReference");
409 String createTime = row.getString("createTime");
410 String acquireTime = row.getString("acquireTime");
411 LockType locktype = row.get("lockType", LockType.class);
412 boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef);
413 String owner = row.getString("owner");
415 return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype, owner);
421 * This method removes the lock ref from the lock table/queue for the key.
423 * @param keyspace of the application.
424 * @param table of the application.
425 * @param key is the primary key of the application table
426 * @param lockReference the lock reference that needs to be dequeued.
427 * @throws MusicServiceException
428 * @throws MusicQueryException
429 * @throws MusicLockingException
431 public void deQueueLockRef(String keyspace, String table, String key, String lockReference, int n)
432 throws MusicServiceException, MusicQueryException, MusicLockingException {
433 String prependTable = table_prepend_name + table;
434 PreparedQueryObject queryObject = new PreparedQueryObject();
435 Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf("$") + 1));
436 String deleteQuery = "delete from " + keyspace + "." + prependTable + " where key='" + key
437 + "' AND lockReference =" + lockReferenceL + " IF EXISTS;";
438 queryObject.appendQueryString(deleteQuery);
439 logger.info(EELFLoggerDelegate.applicationLogger, "Removing lock for key: "+key+ " and reference: "+lockReference);
441 dsHandle.executePut(queryObject, "critical");
442 logger.info(EELFLoggerDelegate.applicationLogger,
443 "Lock removed for key: " + key + " and reference: " + lockReference);
444 } catch (MusicServiceException ex) {
445 logger.error(logger, ex.getMessage(), ex);
446 logger.error(EELFLoggerDelegate.applicationLogger,
447 "Exception while deQueueLockRef for lockname: " + key + " reference:" + lockReference);
449 logger.info(EELFLoggerDelegate.applicationLogger, "Trying again...");
450 deQueueLockRef(keyspace, table, key, lockReference, n - 1);
452 logger.error(EELFLoggerDelegate.applicationLogger,
453 "deQueueLockRef failed for lockname: " + key + " reference:" + lockReference);
454 logger.error(logger, ex.getMessage(), ex);
455 throw new MusicLockingException("Error while deQueueLockRef: " + ex.getMessage());
461 public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) {
462 table = table_prepend_name + table;
463 Long lockReferenceL = Long.parseLong(lockReference);
464 String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis()
465 + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;";
467 //cannot use executePut because we need to ignore music timestamp adjustments for lock store
468 dsHandle.getSession().execute(updateQuery);
471 public boolean checkForDeadlock(String keyspace, String table, String lockName, LockType locktype, String owner, boolean forAcquire) throws MusicServiceException, MusicQueryException {
472 if (locktype.equals(LockType.READ)) return false;
473 if (owner==null || owner.length()==0) return false;
475 String lockTable = table_prepend_name + table;
476 PreparedQueryObject queryObject = new PreparedQueryObject();
477 queryObject.appendQueryString("SELECT key, acquiretime, owner FROM " + keyspace + "." + lockTable);
478 queryObject.appendQueryString(" WHERE lockType = ? ALLOW FILTERING");
479 queryObject.addValue(LockType.WRITE);
481 DeadlockDetectionUtil ddu = getDeadlockDetectionUtil();
483 ResultSet rs = dsHandle.executeLocalQuorumConsistencyGet(queryObject);
484 logger.debug("rs has " + rs.getAvailableWithoutFetching() + (rs.isFullyFetched()?"":" (or more!)") );
485 Iterator<Row> it = rs.iterator();
486 while (it.hasNext()) {
488 logger.debug("key = " + row.getString("key") + ", time = " + row.getString("acquiretime") + ", owner = " + row.getString("owner") );
489 ddu.setExisting(row.getString("key"), row.getString("owner"), ("0".equals(row.getString("acquiretime")))?OwnershipType.CREATED:OwnershipType.ACQUIRED);
491 boolean deadlock = ddu.checkForDeadlock(lockName, owner, forAcquire?OwnershipType.ACQUIRED:OwnershipType.CREATED);
492 if (deadlock) logger.warn("Deadlock detected when " + owner + " tried to create lock on " + keyspace + "." + lockTable + "." + lockName);
497 * This is used for testing purpose
498 * @return new DeadlockDetectionUtil object
500 DeadlockDetectionUtil getDeadlockDetectionUtil() {
501 return new DeadlockDetectionUtil();
504 public List<String> getAllLocksForOwner(String ownerId, String keyspace, String table) throws MusicServiceException, MusicQueryException {
505 List<String> toRet = new ArrayList<String>();
506 String lockTable = table_prepend_name + table;
507 PreparedQueryObject queryObject = new PreparedQueryObject();
508 queryObject.appendQueryString("SELECT key, lockreference FROM " + keyspace + "." + lockTable);
509 queryObject.appendQueryString(" WHERE owner = '" + ownerId + "' ALLOW FILTERING");
511 ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject);
512 Iterator<Row> it = rs.iterator();
513 while (it.hasNext()) {
515 toRet.add(row.getString("key") + "$" + row.getLong("lockreference"));
520 public ReturnType promoteLock(String keyspace, String table, String key, String lockRef)
521 throws MusicLockingException, MusicServiceException, MusicQueryException {
522 String lockqtable = table_prepend_name + table;
523 String selectQuery = "select * from " + keyspace + "." + lockqtable + " where key=?;";
525 PreparedQueryObject queryObject = new PreparedQueryObject();
526 queryObject.appendQueryString(selectQuery);
527 queryObject.addValue(key);
528 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
530 long refToPromote = Long.parseLong(lockRef);
532 boolean topOfQueue = true;
533 boolean readBlock = false;
534 boolean seenLockToPromote = false;
535 boolean promotionOngoing = false;
536 long readBlockStart = 0;
537 long readBlockEnd = 0;
541 long ref = row.getLong("lockreference");
542 LockType lockType = row.get("lockType", LockType.class);
544 if (refToPromote==ref) {
545 if (promotionOngoing) {
546 return new ReturnType(ResultType.FAILURE, "Can't promote, already promoting another lockref.");
548 seenLockToPromote = true;
550 readBlockStart = ref;
554 } else if (!seenLockToPromote && refToPromote<ref) {
555 return new ReturnType(ResultType.FAILURE, "Lockref does not exist.");
558 if (lockType==LockType.READ || lockType==LockType.PROMOTING) {
560 readBlockStart = ref;
566 if (lockType==LockType.PROMOTING) {
567 promotionOngoing = true;
571 if (lockType==LockType.WRITE) {
572 if (refToPromote==ref) {
573 return new ReturnType(ResultType.FAILURE, "Lockref is already write.");
577 promotionOngoing = false;
578 if (seenLockToPromote) {
581 //can no longer be lock holder after this
587 if (readBlockStart<=refToPromote && refToPromote<=readBlockEnd) {
588 if (readBlockStart==refToPromote && refToPromote==readBlockEnd) {
589 promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.WRITE);
590 return new ReturnType(ResultType.SUCCESS, "Lock has successfully been upgraded.");
592 promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.PROMOTING);
593 return new ReturnType(ResultType.FAILURE, "Your lock upgrade is in progress. Check again to see if successful.");
596 //shouldn't reach here?
597 return new ReturnType(ResultType.FAILURE,"Promotion failed.");
600 private void promoteLockTo(String keyspace, String table, String key, String lockRef, LockType newLockType)
601 throws MusicServiceException, MusicQueryException {
602 PreparedQueryObject queryObject =
603 new PreparedQueryObject("UPDATE " + keyspace + "." + table + " SET lockType=? WHERE key='" + key
604 + "' AND lockReference = " + lockRef + " IF EXISTS;", newLockType);
606 //cannot use executePut because we need to ignore music timestamp adjustments for lock store
607 dsHandle.executePut(queryObject, MusicUtil.QUORUM);