2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * Modifications Copyright (C) 2019 IBM.
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
20 * ============LICENSE_END=============================================
21 * ====================================================================
24 package org.onap.music.lockingservice.cassandra;
26 import java.util.ArrayList;
27 import java.util.Iterator;
28 import java.util.List;
30 import org.onap.music.datastore.MusicDataStore;
31 import org.onap.music.datastore.PreparedQueryObject;
32 import org.onap.music.eelf.logging.EELFLoggerDelegate;
33 import org.onap.music.exceptions.MusicLockingException;
34 import org.onap.music.exceptions.MusicQueryException;
35 import org.onap.music.exceptions.MusicServiceException;
36 import org.onap.music.main.DeadlockDetectionUtil;
37 import org.onap.music.main.DeadlockDetectionUtil.OwnershipType;
38 import org.onap.music.main.MusicUtil;
39 import org.onap.music.main.ResultType;
40 import org.onap.music.main.ReturnType;
41 import com.datastax.driver.core.ResultSet;
42 import com.datastax.driver.core.Row;
45 * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state.
48 public class CassaLockStore {
50 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
51 public static final String table_prepend_name = "lockQ_";
52 private MusicDataStore dsHandle;
54 public CassaLockStore() {
55 dsHandle = new MusicDataStore();
58 public CassaLockStore(MusicDataStore dsHandle) {
59 this.dsHandle=dsHandle;
61 public class LockObject{
62 private boolean isLockOwner;
63 private String lockRef;
64 private String createTime;
65 private String acquireTime;
66 private LockType locktype;
67 // Owner is the self-declared client which "owns" this row. It is used for deadlock detection. It is not (directly) related to isLockOwner.
69 public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype, String owner) {
70 this.setIsLockOwner(isLockOwner);
71 this.setLockRef(lockRef);
72 this.setAcquireTime(acquireTime);
73 this.setCreateTime(createTime);
74 this.setLocktype(locktype);
77 public boolean getIsLockOwner() {
80 public void setIsLockOwner(boolean isLockOwner) {
81 this.isLockOwner = isLockOwner;
83 public String getAcquireTime() {
86 public void setAcquireTime(String acquireTime) {
87 this.acquireTime = acquireTime;
89 public String getCreateTime() {
92 public void setCreateTime(String createTime) {
93 this.createTime = createTime;
95 public String getLockRef() {
98 public void setLockRef(String lockRef) {
99 this.lockRef = lockRef;
101 public LockType getLocktype() {
104 public void setLocktype(LockType locktype) {
105 this.locktype = locktype;
107 public String getOwner() {
110 public void setOwner(String owner) {
117 * This method creates a shadow locking table for every main table in Cassandra. This table tracks all information regarding locks.
118 * @param keyspace of the application.
119 * @param table of the application.
120 * @return true if the operation was successful.
121 * @throws MusicServiceException
122 * @throws MusicQueryException
124 public boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
125 logger.info(EELFLoggerDelegate.applicationLogger,
126 "Create lock queue/table for " + keyspace+"."+table);
127 table = table_prepend_name+table;
128 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
129 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, "
130 + "lockType text, leasePeriodTime bigint, owner text, PRIMARY KEY ((key), lockReference) ) "
131 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
132 PreparedQueryObject queryObject = new PreparedQueryObject();
134 queryObject.appendQueryString(tabQuery);
136 result = dsHandle.executePut(queryObject, "eventual");
141 * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps.
142 * @param keyspace of the locks.
143 * @param table of the locks.
144 * @param lockName is the primary key of the lock table
145 * @param lockType is the type of lock (read/write)
146 * @param owner is the owner of the lock (optional, for deadlock detection)
147 * @return the UUID lock reference.
148 * @throws MusicServiceException
149 * @throws MusicQueryException
151 public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner) throws MusicServiceException, MusicQueryException, MusicLockingException {
152 return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, 0);
155 private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
156 logger.info(EELFLoggerDelegate.applicationLogger,
157 "Create " + locktype + " lock reference for " + keyspace + "." + table + "." + lockName);
158 String lockTable ="";
159 lockTable = table_prepend_name+table;
161 PreparedQueryObject queryObject = new PreparedQueryObject();
162 String selectQuery = "SELECT guard FROM " + keyspace + "." + lockTable + " WHERE key=?;";
164 queryObject.addValue(lockName);
165 queryObject.appendQueryString(selectQuery);
166 ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject);
167 List<Row> latestGuardRow = gqResult.all();
171 if (!latestGuardRow.isEmpty()) {
172 prevGuard = latestGuardRow.get(0).getLong(0);
173 lockRef = prevGuard + 1;
176 long lockEpochMillis = System.currentTimeMillis();
178 logger.info(EELFLoggerDelegate.applicationLogger,
179 "Created lock reference for " + keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
181 queryObject = new PreparedQueryObject();
183 String insQuery = "BEGIN BATCH" +
184 " UPDATE " + keyspace + "." + lockTable +
185 " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
186 " INSERT INTO " + keyspace + "." + lockTable +
187 "(key, lockReference, createTime, acquireTime, lockType, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
189 queryObject.addValue(lockRef);
190 queryObject.addValue(lockName);
192 queryObject.addValue(prevGuard);
194 queryObject.addValue(lockName);
195 queryObject.addValue(lockRef);
196 queryObject.addValue(String.valueOf(lockEpochMillis));
197 queryObject.addValue("0");
198 queryObject.addValue(locktype);
199 queryObject.addValue(owner);
200 queryObject.appendQueryString(insQuery);
201 boolean pResult = dsHandle.executePut(queryObject, "critical");
202 if (!pResult) {// couldn't create lock ref, retry
204 if (count > MusicUtil.getRetryCount()) {
205 logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to create lock reference");
206 throw new MusicLockingException("Unable to create lock reference");
208 return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, count);
210 return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef);
214 * Returns a result set containing the list of clients waiting for a particular lock
219 * @return list of lockrefs in the queue
220 * @throws MusicServiceException
221 * @throws MusicQueryException
223 public List<String> getLockQueue(String keyspace, String table, String key)
224 throws MusicServiceException, MusicQueryException {
225 logger.info(EELFLoggerDelegate.applicationLogger,
226 "Getting the queue for " + keyspace + "." + table + "." + key);
227 table = table_prepend_name + table;
228 String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "';";
229 PreparedQueryObject queryObject = new PreparedQueryObject();
230 queryObject.appendQueryString(selectQuery);
231 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
232 ArrayList<String> lockQueue = new ArrayList<>();
234 lockQueue.add(Long.toString(row.getLong("lockReference")));
241 * Returns a result set containing the list of clients waiting for a particular lock
246 * @return size of lockrefs queue
247 * @throws MusicServiceException
248 * @throws MusicQueryException
250 public long getLockQueueSize(String keyspace, String table, String key)
251 throws MusicServiceException, MusicQueryException {
252 logger.info(EELFLoggerDelegate.applicationLogger,
253 "Getting the queue size for " + keyspace + "." + table + "." + key);
254 table = table_prepend_name + table;
255 String selectQuery = "select count(*) from " + keyspace + "." + table + " where key='" + key + "';";
256 PreparedQueryObject queryObject = new PreparedQueryObject();
257 queryObject.appendQueryString(selectQuery);
258 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
259 return rs.one().getLong("count");
264 * This method returns the top of lock table/queue for the key.
266 * @param keyspace of the application.
267 * @param table of the application.
268 * @param key is the primary key of the application table
269 * @return the UUID lock reference. Returns LockObject.isLockOwner=false if there is no owner or the
271 * @throws MusicServiceException
272 * @throws MusicQueryException
274 public LockObject peekLockQueue(String keyspace, String table, String key)
275 throws MusicServiceException, MusicQueryException {
276 logger.info(EELFLoggerDelegate.applicationLogger,
277 "Peek in lock table for " + keyspace + "." + table + "." + key);
278 table = table_prepend_name + table;
279 String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "' LIMIT 1;";
280 PreparedQueryObject queryObject = new PreparedQueryObject();
281 queryObject.appendQueryString(selectQuery);
282 ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
283 Row row = results.one();
284 if (row == null || row.isNull("lockReference")) {
285 return new LockObject(false, null, null, null, null, null);
287 String lockReference = "" + row.getLong("lockReference");
288 String createTime = row.getString("createTime");
289 String acquireTime = row.getString("acquireTime");
290 LockType locktype = row.get("lockType", LockType.class);
291 String owner = row.getString("owner");
293 return new LockObject(true, lockReference, createTime, acquireTime, locktype, owner);
296 public List<String> getCurrentLockHolders(String keyspace, String table, String key)
297 throws MusicServiceException, MusicQueryException {
298 logger.info(EELFLoggerDelegate.applicationLogger,
299 "Getting lockholders in lock table for " + keyspace + "." + table + "." + key);
300 String origTable = table;
301 table = table_prepend_name + table;
302 String selectQuery = "select * from " + keyspace + "." + table + " where key=?;";
303 List<String> lockHolders = new ArrayList<>();
304 PreparedQueryObject queryObject = new PreparedQueryObject();
305 queryObject.appendQueryString(selectQuery);
306 queryObject.addValue(key);
307 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
308 boolean topOfQueue = true;
309 StringBuilder lock = new StringBuilder().
310 append("$").append(keyspace).append(".").append(origTable).
311 append(".").append(key).append("$");
312 StringBuilder lockReference = new StringBuilder();
314 if ( row.isNull("lockReference") ) {
317 lockReference.append(lock).append(row.getLong("lockReference"));
318 if (row.get("lockType", LockType.class)!=LockType.WRITE) {
320 lockHolders.add(lockReference.toString());
327 lockHolders.add(lockReference.toString());
330 lockReference.delete(0,lockReference.length());
336 * Determine if the lock is a valid current lock holder.
342 * @return true if lockRef is a lock owner of key
343 * @throws MusicServiceException
344 * @throws MusicQueryException
346 public boolean isLockOwner(String keyspace, String table, String key, String lockRef)
347 throws MusicServiceException, MusicQueryException {
348 logger.info(EELFLoggerDelegate.applicationLogger,
349 "Checking in lock table for " + keyspace + "." + table + "." + key);
350 table = table_prepend_name + table;
352 "select * from " + keyspace + "." + table + " where key=?;";
353 PreparedQueryObject queryObject = new PreparedQueryObject();
354 queryObject.appendQueryString(selectQuery);
355 queryObject.addValue(key);
356 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
358 boolean topOfQueue = true;
360 String lockReference = "" + row.getLong("lockReference");
361 if (row.get("lockType", LockType.class)==LockType.WRITE) {
362 if (topOfQueue && lockRef.equals(lockReference)) {
368 if (lockRef.equals(lockReference)) {
373 logger.info(EELFLoggerDelegate.applicationLogger, "Could not find " + lockRef
374 + " in the lock queue. It has expired and no longer exists.");
379 * Determine if the lock is a valid current lock holder.
385 * @return true if lockRef is a lock owner of key
386 * @throws MusicServiceException
387 * @throws MusicQueryException
389 public LockObject getLockInfo(String keyspace, String table, String key, String lockRef)
390 throws MusicServiceException, MusicQueryException {
391 logger.info(EELFLoggerDelegate.applicationLogger,
392 "Checking in lock table for " + keyspace + "." + table + "." + key);
393 String lockQ_table = table_prepend_name + table;
395 "select * from " + keyspace + "." + lockQ_table + " where key=? and lockReference=?;";
396 PreparedQueryObject queryObject = new PreparedQueryObject();
397 queryObject.appendQueryString(selectQuery);
398 queryObject.addValue(key);
399 queryObject.addValue(Long.parseLong(lockRef));
400 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
402 if (row == null || row.isNull("lockReference")) {
406 String lockReference = "" + row.getLong("lockReference");
407 String createTime = row.getString("createTime");
408 String acquireTime = row.getString("acquireTime");
409 LockType locktype = row.get("lockType", LockType.class);
410 boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef);
411 String owner = row.getString("owner");
413 return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype, owner);
419 * This method removes the lock ref from the lock table/queue for the key.
421 * @param keyspace of the application.
422 * @param table of the application.
423 * @param key is the primary key of the application table
424 * @param lockReference the lock reference that needs to be dequeued.
425 * @throws MusicServiceException
426 * @throws MusicQueryException
427 * @throws MusicLockingException
429 public void deQueueLockRef(String keyspace, String table, String key, String lockReference, int n)
430 throws MusicServiceException, MusicQueryException, MusicLockingException {
431 String prependTable = table_prepend_name + table;
432 PreparedQueryObject queryObject = new PreparedQueryObject();
433 Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf("$") + 1));
434 String deleteQuery = "delete from " + keyspace + "." + prependTable + " where key='" + key
435 + "' AND lockReference =" + lockReferenceL + " IF EXISTS;";
436 queryObject.appendQueryString(deleteQuery);
437 logger.info(EELFLoggerDelegate.applicationLogger, "Removing lock for key: "+key+ " and reference: "+lockReference);
439 dsHandle.executePut(queryObject, "critical");
440 logger.info(EELFLoggerDelegate.applicationLogger,
441 "Lock removed for key: " + key + " and reference: " + lockReference);
442 } catch (MusicServiceException ex) {
443 logger.error(logger, ex.getMessage(), ex);
444 logger.error(EELFLoggerDelegate.applicationLogger,
445 "Exception while deQueueLockRef for lockname: " + key + " reference:" + lockReference);
447 logger.info(EELFLoggerDelegate.applicationLogger, "Trying again...");
448 deQueueLockRef(keyspace, table, key, lockReference, n - 1);
450 logger.error(EELFLoggerDelegate.applicationLogger,
451 "deQueueLockRef failed for lockname: " + key + " reference:" + lockReference);
452 logger.error(logger, ex.getMessage(), ex);
453 throw new MusicLockingException("Error while deQueueLockRef: " + ex.getMessage());
459 public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) {
460 table = table_prepend_name + table;
461 Long lockReferenceL = Long.parseLong(lockReference);
462 String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis()
463 + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;";
465 //cannot use executePut because we need to ignore music timestamp adjustments for lock store
466 dsHandle.getSession().execute(updateQuery);
469 public boolean checkForDeadlock(String keyspace, String table, String lockName, LockType locktype, String owner, boolean forAcquire) throws MusicServiceException, MusicQueryException {
470 if (locktype.equals(LockType.READ)) return false;
471 if (owner==null || owner.length()==0) return false;
473 String lockTable = table_prepend_name + table;
474 PreparedQueryObject queryObject = new PreparedQueryObject();
475 queryObject.appendQueryString("SELECT key, acquiretime, owner FROM " + keyspace + "." + lockTable);
476 queryObject.appendQueryString(" WHERE lockType = ? ALLOW FILTERING");
477 queryObject.addValue(LockType.WRITE);
479 DeadlockDetectionUtil ddu = getDeadlockDetectionUtil();
481 ResultSet rs = dsHandle.executeLocalQuorumConsistencyGet(queryObject);
482 logger.debug("rs has " + rs.getAvailableWithoutFetching() + (rs.isFullyFetched()?"":" (or more!)") );
483 Iterator<Row> it = rs.iterator();
484 while (it.hasNext()) {
486 logger.debug("key = " + row.getString("key") + ", time = " + row.getString("acquiretime") + ", owner = " + row.getString("owner") );
487 ddu.setExisting(row.getString("key"), row.getString("owner"), ("0".equals(row.getString("acquiretime")))?OwnershipType.CREATED:OwnershipType.ACQUIRED);
489 boolean deadlock = ddu.checkForDeadlock(lockName, owner, forAcquire?OwnershipType.ACQUIRED:OwnershipType.CREATED);
490 if (deadlock) logger.warn("Deadlock detected when " + owner + " tried to create lock on " + keyspace + "." + lockTable + "." + lockName);
495 * This is used for testing purpose
496 * @return new DeadlockDetectionUtil object
498 DeadlockDetectionUtil getDeadlockDetectionUtil() {
499 return new DeadlockDetectionUtil();
502 public List<String> getAllLocksForOwner(String ownerId, String keyspace, String table) throws MusicServiceException, MusicQueryException {
503 List<String> toRet = new ArrayList<String>();
504 String lockTable = table_prepend_name + table;
505 PreparedQueryObject queryObject = new PreparedQueryObject();
506 queryObject.appendQueryString("SELECT key, lockreference FROM " + keyspace + "." + lockTable);
507 queryObject.appendQueryString(" WHERE owner = '" + ownerId + "' ALLOW FILTERING");
509 ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject);
510 Iterator<Row> it = rs.iterator();
511 while (it.hasNext()) {
513 toRet.add(row.getString("key") + "$" + row.getLong("lockreference"));
518 public ReturnType promoteLock(String keyspace, String table, String key, String lockRef)
519 throws MusicLockingException, MusicServiceException, MusicQueryException {
520 String lockqtable = table_prepend_name + table;
521 String selectQuery = "select * from " + keyspace + "." + lockqtable + " where key=?;";
523 PreparedQueryObject queryObject = new PreparedQueryObject();
524 queryObject.appendQueryString(selectQuery);
525 queryObject.addValue(key);
526 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
528 long refToPromote = Long.parseLong(lockRef);
530 boolean topOfQueue = true;
531 boolean readBlock = false;
532 boolean seenLockToPromote = false;
533 boolean promotionOngoing = false;
534 long readBlockStart = 0;
535 long readBlockEnd = 0;
539 long ref = row.getLong("lockreference");
540 LockType lockType = row.get("lockType", LockType.class);
542 if (refToPromote==ref) {
543 if (promotionOngoing) {
544 return new ReturnType(ResultType.FAILURE, "Can't promote, already promoting another lockref.");
546 seenLockToPromote = true;
548 readBlockStart = ref;
552 } else if (!seenLockToPromote && refToPromote<ref) {
553 return new ReturnType(ResultType.FAILURE, "Lockref does not exist.");
556 if (lockType==LockType.READ || lockType==LockType.PROMOTING) {
558 readBlockStart = ref;
564 if (lockType==LockType.PROMOTING) {
565 promotionOngoing = true;
569 if (lockType==LockType.WRITE) {
570 if (refToPromote==ref) {
571 return new ReturnType(ResultType.FAILURE, "Lockref is already write.");
575 promotionOngoing = false;
576 if (seenLockToPromote) {
579 //can no longer be lock holder after this
585 if (readBlockStart<=refToPromote && refToPromote<=readBlockEnd) {
586 if (readBlockStart==refToPromote && refToPromote==readBlockEnd) {
587 promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.WRITE);
588 return new ReturnType(ResultType.SUCCESS, "Lock has successfully been upgraded.");
590 promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.PROMOTING);
591 return new ReturnType(ResultType.FAILURE, "Your lock upgrade is in progress. Check again to see if successful.");
594 //shouldn't reach here?
595 return new ReturnType(ResultType.FAILURE,"Promotion failed.");
598 private void promoteLockTo(String keyspace, String table, String key, String lockRef, LockType newLockType)
599 throws MusicServiceException, MusicQueryException {
600 PreparedQueryObject queryObject =
601 new PreparedQueryObject("UPDATE " + keyspace + "." + table + " SET lockType=? WHERE key='" + key
602 + "' AND lockReference = " + lockRef + " IF EXISTS;", newLockType);
604 //cannot use executePut because we need to ignore music timestamp adjustments for lock store
605 dsHandle.executePut(queryObject, MusicUtil.QUORUM);