2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * Modifications Copyright (C) 2019 IBM.
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
20 * ============LICENSE_END=============================================
21 * ====================================================================
24 package org.onap.music.lockingservice.cassandra;
26 import java.util.ArrayList;
27 import java.util.Iterator;
28 import java.util.List;
30 import org.onap.music.datastore.MusicDataStore;
31 import org.onap.music.datastore.PreparedQueryObject;
32 import org.onap.music.eelf.logging.EELFLoggerDelegate;
33 import org.onap.music.exceptions.MusicLockingException;
34 import org.onap.music.exceptions.MusicQueryException;
35 import org.onap.music.exceptions.MusicServiceException;
36 import org.onap.music.main.DeadlockDetectionUtil;
37 import org.onap.music.main.DeadlockDetectionUtil.OwnershipType;
38 import org.onap.music.main.MusicUtil;
40 import com.datastax.driver.core.ResultSet;
41 import com.datastax.driver.core.Row;
44 * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state.
47 public class CassaLockStore {
49 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
50 private static String table_prepend_name = "lockQ_";
51 private MusicDataStore dsHandle;
53 public CassaLockStore() {
54 dsHandle = new MusicDataStore();
57 public CassaLockStore(MusicDataStore dsHandle) {
58 this.dsHandle=dsHandle;
60 public class LockObject{
61 private boolean isLockOwner;
62 private String lockRef;
63 private String createTime;
64 private String acquireTime;
65 private LockType locktype;
66 // Owner is the self-declared client which "owns" this row. It is used for deadlock detection. It is not (directly) related to isLockOwner.
68 public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype, String owner) {
69 this.setIsLockOwner(isLockOwner);
70 this.setLockRef(lockRef);
71 this.setAcquireTime(acquireTime);
72 this.setCreateTime(createTime);
73 this.setLocktype(locktype);
76 public boolean getIsLockOwner() {
79 public void setIsLockOwner(boolean isLockOwner) {
80 this.isLockOwner = isLockOwner;
82 public String getAcquireTime() {
85 public void setAcquireTime(String acquireTime) {
86 this.acquireTime = acquireTime;
88 public String getCreateTime() {
91 public void setCreateTime(String createTime) {
92 this.createTime = createTime;
94 public String getLockRef() {
97 public void setLockRef(String lockRef) {
98 this.lockRef = lockRef;
100 public LockType getLocktype() {
103 public void setLocktype(LockType locktype) {
104 this.locktype = locktype;
106 public String getOwner() {
109 public void setOwner(String owner) {
116 * This method creates a shadow locking table for every main table in Cassandra. This table tracks all information regarding locks.
117 * @param keyspace of the application.
118 * @param table of the application.
119 * @return true if the operation was successful.
120 * @throws MusicServiceException
121 * @throws MusicQueryException
123 public boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
124 logger.info(EELFLoggerDelegate.applicationLogger,
125 "Create lock queue/table for " + keyspace+"."+table);
126 table = table_prepend_name+table;
127 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
128 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, "
129 + "writeLock boolean, owner text, PRIMARY KEY ((key), lockReference) ) "
130 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
131 PreparedQueryObject queryObject = new PreparedQueryObject();
133 queryObject.appendQueryString(tabQuery);
135 result = dsHandle.executePut(queryObject, "eventual");
140 * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps.
141 * @param keyspace of the locks.
142 * @param table of the locks.
143 * @param lockName is the primary key of the lock table
144 * @param lockType is the type of lock (read/write)
145 * @param owner is the owner of the lock (optional, for deadlock detection)
146 * @return the UUID lock reference.
147 * @throws MusicServiceException
148 * @throws MusicQueryException
150 public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner) throws MusicServiceException, MusicQueryException, MusicLockingException {
151 return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, 0);
154 private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, String owner, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
155 logger.info(EELFLoggerDelegate.applicationLogger,
156 "Create " + locktype + " lock reference for " + keyspace + "." + table + "." + lockName);
157 String lockTable ="";
158 lockTable = table_prepend_name+table;
160 PreparedQueryObject queryObject = new PreparedQueryObject();
161 String selectQuery = "SELECT guard FROM " + keyspace + "." + lockTable + " WHERE key=?;";
163 queryObject.addValue(lockName);
164 queryObject.appendQueryString(selectQuery);
165 ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject);
166 List<Row> latestGuardRow = gqResult.all();
170 if (!latestGuardRow.isEmpty()) {
171 prevGuard = latestGuardRow.get(0).getLong(0);
172 lockRef = prevGuard + 1;
175 long lockEpochMillis = System.currentTimeMillis();
177 logger.info(EELFLoggerDelegate.applicationLogger,
178 "Created lock reference for " + keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
180 queryObject = new PreparedQueryObject();
181 String insQuery = "BEGIN BATCH" +
182 " UPDATE " + keyspace + "." + lockTable +
183 " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
184 " INSERT INTO " + keyspace + "." + lockTable +
185 "(key, lockReference, createTime, acquireTime, writeLock, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
187 queryObject.addValue(lockRef);
188 queryObject.addValue(lockName);
190 queryObject.addValue(prevGuard);
192 queryObject.addValue(lockName);
193 queryObject.addValue(lockRef);
194 queryObject.addValue(String.valueOf(lockEpochMillis));
195 queryObject.addValue("0");
196 queryObject.addValue(locktype==LockType.WRITE ? true : false );
197 queryObject.addValue(owner);
198 queryObject.appendQueryString(insQuery);
199 boolean pResult = dsHandle.executePut(queryObject, "critical");
200 if (!pResult) {// couldn't create lock ref, retry
202 if (count > MusicUtil.getRetryCount()) {
203 logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to create lock reference");
204 throw new MusicLockingException("Unable to create lock reference");
206 return genLockRefandEnQueue(keyspace, table, lockName, locktype, owner, count);
208 return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef);
212 * Returns a result set containing the list of clients waiting for a particular lock
217 * @return list of lockrefs in the queue
218 * @throws MusicServiceException
219 * @throws MusicQueryException
221 public List<String> getLockQueue(String keyspace, String table, String key)
222 throws MusicServiceException, MusicQueryException {
223 logger.info(EELFLoggerDelegate.applicationLogger,
224 "Getting the queue for " + keyspace + "." + table + "." + key);
225 table = table_prepend_name + table;
226 String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "';";
227 PreparedQueryObject queryObject = new PreparedQueryObject();
228 queryObject.appendQueryString(selectQuery);
229 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
230 ArrayList<String> lockQueue = new ArrayList<>();
232 lockQueue.add(Long.toString(row.getLong("lockReference")));
239 * Returns a result set containing the list of clients waiting for a particular lock
244 * @return size of lockrefs queue
245 * @throws MusicServiceException
246 * @throws MusicQueryException
248 public long getLockQueueSize(String keyspace, String table, String key)
249 throws MusicServiceException, MusicQueryException {
250 logger.info(EELFLoggerDelegate.applicationLogger,
251 "Getting the queue size for " + keyspace + "." + table + "." + key);
252 table = table_prepend_name + table;
253 String selectQuery = "select count(*) from " + keyspace + "." + table + " where key='" + key + "';";
254 PreparedQueryObject queryObject = new PreparedQueryObject();
255 queryObject.appendQueryString(selectQuery);
256 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
257 return rs.one().getLong("count");
262 * This method returns the top of lock table/queue for the key.
264 * @param keyspace of the application.
265 * @param table of the application.
266 * @param key is the primary key of the application table
267 * @return the UUID lock reference. Returns LockObject.isLockOwner=false if there is no owner or the
269 * @throws MusicServiceException
270 * @throws MusicQueryException
272 public LockObject peekLockQueue(String keyspace, String table, String key)
273 throws MusicServiceException, MusicQueryException {
274 logger.info(EELFLoggerDelegate.applicationLogger,
275 "Peek in lock table for " + keyspace + "." + table + "." + key);
276 table = table_prepend_name + table;
277 String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "' LIMIT 1;";
278 PreparedQueryObject queryObject = new PreparedQueryObject();
279 queryObject.appendQueryString(selectQuery);
280 ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
281 Row row = results.one();
282 if (row == null || row.isNull("lockReference")) {
283 return new LockObject(false, null, null, null, null, null);
285 String lockReference = "" + row.getLong("lockReference");
286 String createTime = row.getString("createTime");
287 String acquireTime = row.getString("acquireTime");
288 LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
289 String owner = row.getString("owner");
291 return new LockObject(true, lockReference, createTime, acquireTime, locktype, owner);
294 public List<String> getCurrentLockHolders(String keyspace, String table, String key)
295 throws MusicServiceException, MusicQueryException {
296 logger.info(EELFLoggerDelegate.applicationLogger,
297 "Getting lockholders in lock table for " + keyspace + "." + table + "." + key);
298 String origTable = table;
299 table = table_prepend_name + table;
300 String selectQuery = "select * from " + keyspace + "." + table + " where key=?;";
301 List<String> lockHolders = new ArrayList<>();
302 PreparedQueryObject queryObject = new PreparedQueryObject();
303 queryObject.appendQueryString(selectQuery);
304 queryObject.addValue(key);
305 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
306 boolean topOfQueue = true;
307 StringBuilder lock = new StringBuilder().
308 append("$").append(keyspace).append(".").append(origTable).
309 append(".").append(key).append("$");
310 StringBuilder lockReference = new StringBuilder();
312 if ( row.isNull("lockReference") ) {
315 lockReference.append(lock).append(row.getLong("lockReference"));
316 if (row.isNull("writeLock") || row.getBool("writeLock")) {
318 lockHolders.add(lockReference.toString());
325 lockHolders.add(lockReference.toString());
328 lockReference.delete(0,lockReference.length());
334 * Determine if the lock is a valid current lock holder.
340 * @return true if lockRef is a lock owner of key
341 * @throws MusicServiceException
342 * @throws MusicQueryException
344 public boolean isLockOwner(String keyspace, String table, String key, String lockRef)
345 throws MusicServiceException, MusicQueryException {
346 logger.info(EELFLoggerDelegate.applicationLogger,
347 "Checking in lock table for " + keyspace + "." + table + "." + key);
348 table = table_prepend_name + table;
350 "select * from " + keyspace + "." + table + " where key=?;";
351 PreparedQueryObject queryObject = new PreparedQueryObject();
352 queryObject.appendQueryString(selectQuery);
353 queryObject.addValue(key);
354 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
356 boolean topOfQueue = true;
358 String lockReference = "" + row.getLong("lockReference");
359 if (row.isNull("writeLock") || row.getBool("writeLock")) {
360 if (topOfQueue && lockRef.equals(lockReference)) {
366 if (lockRef.equals(lockReference)) {
371 logger.info(EELFLoggerDelegate.applicationLogger, "Could not find " + lockRef
372 + " in the lock queue. It has expired and no longer exists.");
377 * Determine if the lock is a valid current lock holder.
383 * @return true if lockRef is a lock owner of key
384 * @throws MusicServiceException
385 * @throws MusicQueryException
387 public LockObject getLockInfo(String keyspace, String table, String key, String lockRef)
388 throws MusicServiceException, MusicQueryException {
389 logger.info(EELFLoggerDelegate.applicationLogger,
390 "Checking in lock table for " + keyspace + "." + table + "." + key);
391 String lockQ_table = table_prepend_name + table;
393 "select * from " + keyspace + "." + lockQ_table + " where key=? and lockReference=?;";
394 PreparedQueryObject queryObject = new PreparedQueryObject();
395 queryObject.appendQueryString(selectQuery);
396 queryObject.addValue(key);
397 queryObject.addValue(Long.parseLong(lockRef));
398 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
400 if (row == null || row.isNull("lockReference")) {
404 String lockReference = "" + row.getLong("lockReference");
405 String createTime = row.getString("createTime");
406 String acquireTime = row.getString("acquireTime");
407 LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
408 boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef);
409 String owner = row.getString("owner");
411 return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype, owner);
417 * This method removes the lock ref from the lock table/queue for the key.
419 * @param keyspace of the application.
420 * @param table of the application.
421 * @param key is the primary key of the application table
422 * @param lockReference the lock reference that needs to be dequeued.
423 * @throws MusicServiceException
424 * @throws MusicQueryException
425 * @throws MusicLockingException
427 public void deQueueLockRef(String keyspace, String table, String key, String lockReference, int n)
428 throws MusicServiceException, MusicQueryException, MusicLockingException {
429 String prependTable = table_prepend_name + table;
430 PreparedQueryObject queryObject = new PreparedQueryObject();
431 Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf("$") + 1));
432 String deleteQuery = "delete from " + keyspace + "." + prependTable + " where key='" + key
433 + "' AND lockReference =" + lockReferenceL + " IF EXISTS;";
434 queryObject.appendQueryString(deleteQuery);
435 logger.info(EELFLoggerDelegate.applicationLogger, "Removing lock for key: "+key+ " and reference: "+lockReference);
437 dsHandle.executePut(queryObject, "critical");
438 logger.info(EELFLoggerDelegate.applicationLogger,
439 "Lock removed for key: " + key + " and reference: " + lockReference);
440 } catch (MusicServiceException ex) {
441 logger.error(logger, ex.getMessage(), ex);
442 logger.error(EELFLoggerDelegate.applicationLogger,
443 "Exception while deQueueLockRef for lockname: " + key + " reference:" + lockReference);
445 logger.info(EELFLoggerDelegate.applicationLogger, "Trying again...");
446 deQueueLockRef(keyspace, table, key, lockReference, n - 1);
448 logger.error(EELFLoggerDelegate.applicationLogger,
449 "deQueueLockRef failed for lockname: " + key + " reference:" + lockReference);
450 logger.error(logger, ex.getMessage(), ex);
451 throw new MusicLockingException("Error while deQueueLockRef: " + ex.getMessage());
457 public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference)
458 throws MusicServiceException, MusicQueryException {
459 table = table_prepend_name + table;
460 PreparedQueryObject queryObject = new PreparedQueryObject();
461 Long lockReferenceL = Long.parseLong(lockReference);
462 String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis()
463 + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;";
464 queryObject.appendQueryString(updateQuery);
466 //cannot use executePut because we need to ignore music timestamp adjustments for lock store
467 dsHandle.getSession().execute(updateQuery);
470 public boolean checkForDeadlock(String keyspace, String table, String lockName, LockType locktype, String owner, boolean forAcquire) throws MusicServiceException, MusicQueryException {
471 if (locktype.equals(LockType.READ)) return false;
472 if (owner==null || owner.length()==0) return false;
474 String lockTable = table_prepend_name + table;
475 PreparedQueryObject queryObject = new PreparedQueryObject();
476 queryObject.appendQueryString("SELECT key, acquiretime, owner FROM " + keyspace + "." + lockTable);
477 queryObject.appendQueryString(" WHERE writelock = True ALLOW FILTERING");
479 DeadlockDetectionUtil ddu = new DeadlockDetectionUtil();
481 ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject);
482 logger.debug("rs has " + rs.getAvailableWithoutFetching() + (rs.isFullyFetched()?"":" (or more!)") );
483 Iterator<Row> it = rs.iterator();
484 while (it.hasNext()) {
486 logger.debug("key = " + row.getString("key") + ", time = " + row.getString("acquiretime") + ", owner = " + row.getString("owner") );
487 ddu.setExisting(row.getString("key"), row.getString("owner"), ("0".equals(row.getString("acquiretime")))?OwnershipType.CREATED:OwnershipType.ACQUIRED);
489 boolean deadlock = ddu.checkForDeadlock(lockName, owner, forAcquire?OwnershipType.ACQUIRED:OwnershipType.CREATED);
490 if (deadlock) logger.warn("Deadlock detected when " + owner + " tried to create lock on " + keyspace + "." + lockTable + "." + lockName);
494 public List<String> getAllLocksForOwner(String ownerId, String keyspace, String table) throws MusicServiceException, MusicQueryException {
495 List<String> toRet = new ArrayList<String>();
496 String lockTable = table_prepend_name + table;
497 PreparedQueryObject queryObject = new PreparedQueryObject();
498 queryObject.appendQueryString("SELECT key, lockreference FROM " + keyspace + "." + lockTable);
499 queryObject.appendQueryString(" WHERE owner = '" + ownerId + "' ALLOW FILTERING");
501 ResultSet rs = dsHandle.executeQuorumConsistencyGet(queryObject);
502 Iterator<Row> it = rs.iterator();
503 while (it.hasNext()) {
505 toRet.add(row.getString("key") + "$" + row.getLong("lockreference"));