2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * Modifications Copyright (C) 2019 IBM.
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
20 * ============LICENSE_END=============================================
21 * ====================================================================
24 package org.onap.music.lockingservice.cassandra;
26 import java.util.ArrayList;
27 import java.util.List;
29 import org.onap.music.datastore.MusicDataStore;
30 import org.onap.music.datastore.PreparedQueryObject;
31 import org.onap.music.eelf.logging.EELFLoggerDelegate;
32 import org.onap.music.exceptions.MusicLockingException;
33 import org.onap.music.exceptions.MusicQueryException;
34 import org.onap.music.exceptions.MusicServiceException;
35 import org.onap.music.main.MusicUtil;
37 import com.datastax.driver.core.ResultSet;
38 import com.datastax.driver.core.Row;
41 * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state.
44 public class CassaLockStore {
46 private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
47 private static String table_prepend_name = "lockQ_";
48 private MusicDataStore dsHandle;
50 public CassaLockStore() {
51 dsHandle = new MusicDataStore();
54 public CassaLockStore(MusicDataStore dsHandle) {
55 this.dsHandle=dsHandle;
57 public class LockObject{
58 private boolean isLockOwner;
59 private String lockRef;
60 private String createTime;
61 private String acquireTime;
62 private LockType locktype;
63 public LockObject(boolean isLockOwner, String lockRef, String createTime, String acquireTime, LockType locktype) {
64 this.setIsLockOwner(isLockOwner);
65 this.setLockRef(lockRef);
66 this.setAcquireTime(acquireTime);
67 this.setCreateTime(createTime);
68 this.setLocktype(locktype);
70 public boolean getIsLockOwner() {
73 public void setIsLockOwner(boolean isLockOwner) {
74 this.isLockOwner = isLockOwner;
76 public String getAcquireTime() {
79 public void setAcquireTime(String acquireTime) {
80 this.acquireTime = acquireTime;
82 public String getCreateTime() {
85 public void setCreateTime(String createTime) {
86 this.createTime = createTime;
88 public String getLockRef() {
91 public void setLockRef(String lockRef) {
92 this.lockRef = lockRef;
94 public LockType getLocktype() {
97 public void setLocktype(LockType locktype) {
98 this.locktype = locktype;
104 * This method creates a shadow locking table for every main table in Cassandra. This table tracks all information regarding locks.
105 * @param keyspace of the application.
106 * @param table of the application.
107 * @return true if the operation was successful.
108 * @throws MusicServiceException
109 * @throws MusicQueryException
111 public boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
112 logger.info(EELFLoggerDelegate.applicationLogger,
113 "Create lock queue/table for " + keyspace+"."+table);
114 table = table_prepend_name+table;
115 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
116 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, "
117 + "writeLock boolean, PRIMARY KEY ((key), lockReference) ) "
118 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
119 PreparedQueryObject queryObject = new PreparedQueryObject();
121 queryObject.appendQueryString(tabQuery);
123 result = dsHandle.executePut(queryObject, "eventual");
128 * This method creates a lock reference for each invocation. The lock references are monotonically increasing timestamps.
129 * @param keyspace of the locks.
130 * @param table of the locks.
131 * @param lockName is the primary key of the lock table
132 * @return the UUID lock reference.
133 * @throws MusicServiceException
134 * @throws MusicQueryException
136 public String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype) throws MusicServiceException, MusicQueryException, MusicLockingException {
137 return genLockRefandEnQueue(keyspace, table, lockName, locktype, 0);
140 private String genLockRefandEnQueue(String keyspace, String table, String lockName, LockType locktype, int count) throws MusicServiceException, MusicQueryException, MusicLockingException {
141 logger.info(EELFLoggerDelegate.applicationLogger,
142 "Create " + locktype + " lock reference for " + keyspace + "." + table + "." + lockName);
143 String lockTable ="";
144 lockTable = table_prepend_name+table;
148 PreparedQueryObject queryObject = new PreparedQueryObject();
149 String selectQuery = "SELECT guard FROM " + keyspace + "." + lockTable + " WHERE key=?;";
151 queryObject.addValue(lockName);
152 queryObject.appendQueryString(selectQuery);
153 ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject);
154 List<Row> latestGuardRow = gqResult.all();
158 if (!latestGuardRow.isEmpty()) {
159 prevGuard = latestGuardRow.get(0).getLong(0);
160 lockRef = prevGuard + 1;
163 long lockEpochMillis = System.currentTimeMillis();
165 logger.info(EELFLoggerDelegate.applicationLogger,
166 "Created lock reference for " + keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
168 queryObject = new PreparedQueryObject();
169 String insQuery = "BEGIN BATCH" +
170 " UPDATE " + keyspace + "." + lockTable +
171 " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
172 " INSERT INTO " + keyspace + "." + lockTable +
173 "(key, lockReference, createTime, acquireTime, writeLock) VALUES (?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
175 queryObject.addValue(lockRef);
176 queryObject.addValue(lockName);
178 queryObject.addValue(prevGuard);
180 queryObject.addValue(lockName);
181 queryObject.addValue(lockRef);
182 queryObject.addValue(String.valueOf(lockEpochMillis));
183 queryObject.addValue("0");
184 queryObject.addValue(locktype==LockType.WRITE ? true : false );
185 queryObject.appendQueryString(insQuery);
186 boolean pResult = dsHandle.executePut(queryObject, "critical");
187 if (!pResult) {// couldn't create lock ref, retry
189 if (count > MusicUtil.getRetryCount()) {
190 logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to create lock reference");
191 throw new MusicLockingException("Unable to create lock reference");
193 return genLockRefandEnQueue(keyspace, table, lockName, locktype, count);
195 return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef);
201 * Returns a result set containing the list of clients waiting for a particular lock
206 * @return list of lockrefs in the queue
207 * @throws MusicServiceException
208 * @throws MusicQueryException
210 public List<String> getLockQueue(String keyspace, String table, String key)
211 throws MusicServiceException, MusicQueryException {
212 logger.info(EELFLoggerDelegate.applicationLogger,
213 "Getting the queue for " + keyspace + "." + table + "." + key);
214 table = table_prepend_name + table;
215 String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "';";
216 PreparedQueryObject queryObject = new PreparedQueryObject();
217 queryObject.appendQueryString(selectQuery);
218 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
219 ArrayList<String> lockQueue = new ArrayList<>();
221 lockQueue.add(Long.toString(row.getLong("lockReference")));
228 * Returns a result set containing the list of clients waiting for a particular lock
233 * @return size of lockrefs queue
234 * @throws MusicServiceException
235 * @throws MusicQueryException
237 public long getLockQueueSize(String keyspace, String table, String key)
238 throws MusicServiceException, MusicQueryException {
239 logger.info(EELFLoggerDelegate.applicationLogger,
240 "Getting the queue size for " + keyspace + "." + table + "." + key);
241 table = table_prepend_name + table;
242 String selectQuery = "select count(*) from " + keyspace + "." + table + " where key='" + key + "';";
243 PreparedQueryObject queryObject = new PreparedQueryObject();
244 queryObject.appendQueryString(selectQuery);
245 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
246 return rs.one().getLong("count");
251 * This method returns the top of lock table/queue for the key.
253 * @param keyspace of the application.
254 * @param table of the application.
255 * @param key is the primary key of the application table
256 * @return the UUID lock reference. Returns LockObject.isLockOwner=false if there is no owner or the
258 * @throws MusicServiceException
259 * @throws MusicQueryException
261 public LockObject peekLockQueue(String keyspace, String table, String key)
262 throws MusicServiceException, MusicQueryException {
263 logger.info(EELFLoggerDelegate.applicationLogger,
264 "Peek in lock table for " + keyspace + "." + table + "." + key);
265 table = table_prepend_name + table;
266 String selectQuery = "select * from " + keyspace + "." + table + " where key='" + key + "' LIMIT 1;";
267 PreparedQueryObject queryObject = new PreparedQueryObject();
268 queryObject.appendQueryString(selectQuery);
269 ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
270 Row row = results.one();
271 if (row == null || row.isNull("lockReference")) {
272 return new LockObject(false, null, null, null, null);
274 String lockReference = "" + row.getLong("lockReference");
275 String createTime = row.getString("createTime");
276 String acquireTime = row.getString("acquireTime");
277 LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
279 return new LockObject(true, lockReference, createTime, acquireTime, locktype);
282 public List<String> getCurrentLockHolders(String keyspace, String table, String key)
283 throws MusicServiceException, MusicQueryException {
284 logger.info(EELFLoggerDelegate.applicationLogger,
285 "Getting lockholders in lock table for " + keyspace + "." + table + "." + key);
286 String origTable = table;
287 table = table_prepend_name + table;
288 String selectQuery = "select * from " + keyspace + "." + table + " where key=?;";
289 List<String> lockHolders = new ArrayList<>();
290 PreparedQueryObject queryObject = new PreparedQueryObject();
291 queryObject.appendQueryString(selectQuery);
292 queryObject.addValue(key);
293 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
294 boolean topOfQueue = true;
295 StringBuilder lock = new StringBuilder().
296 append("$").append(keyspace).append(".").append(origTable).
297 append(".").append(key).append("$");
298 StringBuilder lockReference = new StringBuilder();
300 if ( row.isNull("lockReference") ) {
303 lockReference.append(lock).append(row.getLong("lockReference"));
304 if (row.isNull("writeLock") || row.getBool("writeLock")) {
306 lockHolders.add(lockReference.toString());
313 lockHolders.add(lockReference.toString());
316 lockReference.delete(0,lockReference.length());
322 * Determine if the lock is a valid current lock holder.
328 * @return true if lockRef is a lock owner of key
329 * @throws MusicServiceException
330 * @throws MusicQueryException
332 public boolean isLockOwner(String keyspace, String table, String key, String lockRef)
333 throws MusicServiceException, MusicQueryException {
334 logger.info(EELFLoggerDelegate.applicationLogger,
335 "Checking in lock table for " + keyspace + "." + table + "." + key);
336 table = table_prepend_name + table;
338 "select * from " + keyspace + "." + table + " where key=?;";
339 PreparedQueryObject queryObject = new PreparedQueryObject();
340 queryObject.appendQueryString(selectQuery);
341 queryObject.addValue(key);
342 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
344 boolean topOfQueue = true;
346 String lockReference = "" + row.getLong("lockReference");
347 if (row.isNull("writeLock") || row.getBool("writeLock")) {
348 if (topOfQueue && lockRef.equals(lockReference)) {
354 if (lockRef.equals(lockReference)) {
359 logger.info(EELFLoggerDelegate.applicationLogger, "Could not find " + lockRef
360 + " in the lock queue. It has expired and no longer exists.");
365 * Determine if the lock is a valid current lock holder.
371 * @return true if lockRef is a lock owner of key
372 * @throws MusicServiceException
373 * @throws MusicQueryException
375 public LockObject getLockInfo(String keyspace, String table, String key, String lockRef)
376 throws MusicServiceException, MusicQueryException {
377 logger.info(EELFLoggerDelegate.applicationLogger,
378 "Checking in lock table for " + keyspace + "." + table + "." + key);
379 String lockQ_table = table_prepend_name + table;
381 "select * from " + keyspace + "." + lockQ_table + " where key=? and lockReference=?;";
382 PreparedQueryObject queryObject = new PreparedQueryObject();
383 queryObject.appendQueryString(selectQuery);
384 queryObject.addValue(key);
385 queryObject.addValue(Long.parseLong(lockRef));
386 ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
388 if (row == null || row.isNull("lockReference")) {
392 String lockReference = "" + row.getLong("lockReference");
393 String createTime = row.getString("createTime");
394 String acquireTime = row.getString("acquireTime");
395 LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
396 boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef);
398 return new LockObject(isLockOwner, lockReference, createTime, acquireTime, locktype);
404 * This method removes the lock ref from the lock table/queue for the key.
406 * @param keyspace of the application.
407 * @param table of the application.
408 * @param key is the primary key of the application table
409 * @param lockReference the lock reference that needs to be dequeued.
410 * @throws MusicServiceException
411 * @throws MusicQueryException
412 * @throws MusicLockingException
414 public void deQueueLockRef(String keyspace, String table, String key, String lockReference, int n)
415 throws MusicServiceException, MusicQueryException, MusicLockingException {
416 String prependTable = table_prepend_name + table;
417 PreparedQueryObject queryObject = new PreparedQueryObject();
418 Long lockReferenceL = Long.parseLong(lockReference.substring(lockReference.lastIndexOf("$") + 1));
419 String deleteQuery = "delete from " + keyspace + "." + prependTable + " where key='" + key
420 + "' AND lockReference =" + lockReferenceL + " IF EXISTS;";
421 queryObject.appendQueryString(deleteQuery);
422 logger.info(EELFLoggerDelegate.applicationLogger, "Removing lock for key: "+key+ " and reference: "+lockReference);
424 dsHandle.executePut(queryObject, "critical");
425 logger.info(EELFLoggerDelegate.applicationLogger,
426 "Lock removed for key: " + key + " and reference: " + lockReference);
427 } catch (MusicServiceException ex) {
428 logger.error(logger, ex.getMessage(), ex);
429 logger.error(EELFLoggerDelegate.applicationLogger,
430 "Exception while deQueueLockRef for lockname: " + key + " reference:" + lockReference);
432 logger.info(EELFLoggerDelegate.applicationLogger, "Trying again...");
433 deQueueLockRef(keyspace, table, key, lockReference, n - 1);
435 logger.error(EELFLoggerDelegate.applicationLogger,
436 "deQueueLockRef failed for lockname: " + key + " reference:" + lockReference);
437 logger.error(logger, ex.getMessage(), ex);
438 throw new MusicLockingException("Error while deQueueLockRef: " + ex.getMessage());
444 public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) {
445 table = table_prepend_name + table;
446 PreparedQueryObject queryObject = new PreparedQueryObject();
447 Long lockReferenceL = Long.parseLong(lockReference);
448 String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis()
449 + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;";
450 queryObject.appendQueryString(updateQuery);
452 //cannot use executePut because we need to ignore music timestamp adjustments for lock store
453 dsHandle.getSession().execute(updateQuery);