2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * Modifications Copyright (c) 2018 IBM.
7 * ===================================================================
8 * Modifications Copyright (c) 2019 Samsung
9 * ===================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
22 * ============LICENSE_END=============================================
23 * ====================================================================
26 package org.onap.music.service.impl;
28 import java.io.StringWriter;
29 import java.util.List;
31 import java.util.StringTokenizer;
33 import org.onap.music.datastore.MusicDataStore;
34 import org.onap.music.datastore.MusicDataStoreHandle;
35 import org.onap.music.datastore.PreparedQueryObject;
36 import org.onap.music.eelf.logging.EELFLoggerDelegate;
37 import org.onap.music.eelf.logging.format.AppMessages;
38 import org.onap.music.eelf.logging.format.ErrorSeverity;
39 import org.onap.music.eelf.logging.format.ErrorTypes;
40 import org.onap.music.exceptions.MusicLockingException;
41 import org.onap.music.exceptions.MusicQueryException;
42 import org.onap.music.exceptions.MusicServiceException;
43 import org.onap.music.lockingservice.cassandra.CassaLockStore;
44 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
45 import org.onap.music.lockingservice.cassandra.MusicLockState;
46 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
47 import org.onap.music.main.MusicUtil;
48 import org.onap.music.main.ResultType;
49 import org.onap.music.main.ReturnType;
50 import org.onap.music.service.MusicCoreService;
52 import com.datastax.driver.core.DataType;
53 import com.datastax.driver.core.ResultSet;
54 import com.datastax.driver.core.Row;
55 import com.datastax.driver.core.TableMetadata;
57 import org.onap.music.datastore.*;
59 public class MusicCassaCore implements MusicCoreService {
61 public static CassaLockStore mLockHandle = null;;
62 private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
63 private static boolean unitTestRun=true;
64 private static MusicCassaCore musicCassaCoreInstance = null;
66 private MusicCassaCore() {
69 public static MusicCassaCore getInstance() {
71 if(musicCassaCoreInstance == null) {
72 musicCassaCoreInstance = new MusicCassaCore();
74 return musicCassaCoreInstance;
77 public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
78 logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
79 long start = System.currentTimeMillis();
81 if (mLockHandle == null) {
83 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
84 } catch (Exception e) {
85 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
86 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
89 long end = System.currentTimeMillis();
90 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
96 public String createLockReference(String fullyQualifiedKey) {
97 String[] splitString = fullyQualifiedKey.split("\\.");
98 String keyspace = splitString[0];
99 String table = splitString[1];
100 String lockName = splitString[2];
102 logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
103 long start = System.currentTimeMillis();
104 String lockReference = null;
106 lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName);
107 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
109 logger.error(EELFLoggerDelegate.applicationLogger, e);
111 long end = System.currentTimeMillis();
112 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
113 return lockReference;
117 public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
118 evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
119 return acquireLock(fullyQualifiedKey, lockReference);
122 private void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
124 String[] splitString = fullyQualifiedKey.split("\\.");
125 String keyspace = splitString[0];
126 String table = splitString[1];
127 String primaryKeyValue = splitString[2];
129 LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
131 /* Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the
134 long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.acquireTime), Long.parseLong(currentLockHolderObject.createTime));
135 if((System.currentTimeMillis() - referenceTime) > leasePeriod) {
136 forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.lockRef+"");
137 logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.lockRef+" forcibly released");
141 private static ReturnType isTopOfLockStore(String keyspace, String table, String primaryKeyValue, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException {
143 //return failure to lock holders too early or already evicted from the lock store
144 String topOfLockStoreS = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
145 long topOfLockStoreL = Long.parseLong(topOfLockStoreS);
146 long lockReferenceL = Long.parseLong(lockReference);
148 if(lockReferenceL > topOfLockStoreL) {
149 logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not the lock holder yet");
150 return new ReturnType(ResultType.FAILURE, lockReference+" is not the lock holder yet");
154 if(lockReferenceL < topOfLockStoreL) {
155 logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is no longer/or was never in the lock store queue");
156 return new ReturnType(ResultType.FAILURE, lockReference+" is no longer/or was never in the lock store queue");
159 return new ReturnType(ResultType.SUCCESS, lockReference+" is top of lock store");
162 public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
163 throws MusicLockingException, MusicQueryException, MusicServiceException {
164 String[] splitString = lockId.split("\\.");
165 String keyspace = splitString[0].substring(1);//remove '$'
166 String table = splitString[1];
167 String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
168 String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
169 String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
171 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockRef);
173 if(result.getResult().equals(ResultType.FAILURE))
174 return result;//not top of the lock store q
176 //check to see if the value of the key has to be synced in case there was a forceful release
177 String syncTable = keyspace+".unsyncedKeys_"+table;
178 String query = "select * from "+syncTable+" where key='"+localFullyQualifiedKey+"';";
179 PreparedQueryObject readQueryObject = new PreparedQueryObject();
180 readQueryObject.appendQueryString(query);
181 ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
182 if (results.all().size() != 0) {
183 logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
185 syncQuorum(keyspace, table, primaryKeyValue);
186 } catch (Exception e) {
187 StringWriter sw = new StringWriter();
188 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",
189 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
190 String exceptionAsString = sw.toString();
191 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
193 String cleanQuery = "delete from music_internal.unsynced_keys where key='"+localFullyQualifiedKey+"';";
194 PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
195 deleteQueryObject.appendQueryString(cleanQuery);
196 MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
199 getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
201 return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
208 * @param tableQueryObject
210 * @return Boolean Indicates success or failure
211 * @throws MusicServiceException
215 public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject, String consistency) throws MusicServiceException {
216 boolean result = false;
219 //create shadow locking table
220 result = getLockingServiceHandle().createLockQueue(keyspace, table);
222 return ResultType.FAILURE;
226 //create table to track unsynced_keys
227 table = "unsyncedKeys_"+table;
229 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
230 + " ( key text,PRIMARY KEY (key) );";
231 PreparedQueryObject queryObject = new PreparedQueryObject();
233 queryObject.appendQueryString(tabQuery);
235 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
238 //create actual table
239 result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
240 } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
241 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity
242 .WARN, ErrorTypes.MUSICSERVICEERROR, ex);
243 throw new MusicServiceException(ex.getMessage());
245 return result?ResultType.SUCCESS:ResultType.FAILURE;
248 private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
249 logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
250 PreparedQueryObject selectQuery = new PreparedQueryObject();
251 PreparedQueryObject updateQuery = new PreparedQueryObject();
253 // get the primary key d
254 TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
255 String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName();// we only support single
257 DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
258 Object cqlFormattedPrimaryKeyValue =
259 MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
261 // get the row of data from a quorum
262 selectQuery.appendQueryString("SELECT * FROM " + keyspace + "." + table + " WHERE "
263 + primaryKeyName + "= ?" + ";");
264 selectQuery.addValue(cqlFormattedPrimaryKeyValue);
265 MusicUtil.writeBackToQuorum(selectQuery, primaryKeyName, updateQuery, keyspace, table,
266 cqlFormattedPrimaryKeyValue);
275 public ResultSet quorumGet(PreparedQueryObject query) {
276 ResultSet results = null;
278 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
279 } catch (MusicServiceException | MusicQueryException e) {
280 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity
281 .MAJOR, ErrorTypes.GENERALSERVICEERROR, e);
290 * @param fullyQualifiedKey lockName
293 public String whoseTurnIsIt(String fullyQualifiedKey) {
294 String[] splitString = fullyQualifiedKey.split("\\.");
295 String keyspace = splitString[0];
296 String table = splitString[1];
297 String primaryKeyValue = splitString[2];
299 return "$" + fullyQualifiedKey + "$"
300 + getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
301 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
302 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey
303 ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
310 * @param lockReference
313 public static String getLockNameFromId(String lockReference) {
314 StringTokenizer st = new StringTokenizer(lockReference);
315 return st.nextToken("$");
319 public void destroyLockRef(String lockId) {
320 long start = System.currentTimeMillis();
321 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
322 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
323 String[] splitString = fullyQualifiedKey.split("\\.");
324 String keyspace = splitString[0];
325 String table = splitString[1];
326 String primaryKeyValue = splitString[2];
328 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef);
329 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
330 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef ,
331 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
333 long end = System.currentTimeMillis();
334 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
337 public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) {
338 long start = System.currentTimeMillis();
339 String[] splitString = fullyQualifiedKey.split("\\.");
340 String keyspace = splitString[0];
341 String table = splitString[1];
342 String primaryKeyValue = splitString[2];
344 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference);
345 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
346 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockReference ,
347 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
349 long end = System.currentTimeMillis();
350 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
351 return new MusicLockState(LockStatus.UNLOCKED, "");
355 public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) {
356 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
357 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
358 if (voluntaryRelease) {
359 return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
361 return forciblyReleaseLock(fullyQualifiedKey, lockRef);
365 public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference) {
366 return destroyLockRef(fullyQualifiedKey, lockReference);
369 public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) {
370 String[] splitString = fullyQualifiedKey.split("\\.");
371 String keyspace = splitString[0];
372 String table = splitString[1];
374 //leave a signal that this key could potentially be unsynchronized
375 String syncTable = keyspace+".unsyncedKeys_"+table;
376 PreparedQueryObject queryObject = new PreparedQueryObject();
377 String values = "(?)";
378 queryObject.addValue(fullyQualifiedKey);
379 String insQuery = "insert into "+syncTable+" (key) values "+values+";";
380 queryObject.appendQueryString(insQuery);
382 MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
383 } catch (Exception e) {
384 logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
385 + e.getMessage(), e);
388 //now release the lock
389 return destroyLockRef(fullyQualifiedKey, lockReference);
395 * @throws MusicLockingException
397 public void deleteLock(String lockName) throws MusicLockingException {
401 // Prepared Query Additions.
407 * @throws MusicServiceException
409 public ReturnType eventualPut(PreparedQueryObject queryObject) {
410 boolean result = false;
412 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
413 } catch (MusicServiceException | MusicQueryException ex) {
414 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
415 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
416 return new ReturnType(ResultType.FAILURE, ex.getMessage());
419 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
421 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
429 * @throws MusicServiceException
431 public ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
432 boolean result = false;
434 PreparedQueryObject getGaurd = new PreparedQueryObject();
435 getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
436 getGaurd.addValue(primaryKey);
438 ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
439 Row row = getGaurdResult.one();
441 guard = row.getLong("guard");
442 long timeOfWrite = System.currentTimeMillis();
443 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
444 String query = queryObject.getQuery();
445 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
446 if (queryObject.getOperation().equalsIgnoreCase("delete"))
447 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
449 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
451 queryObject.replaceQueryString(query);
454 } catch (MusicServiceException | MusicQueryException e) {
455 logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(), e);
458 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
459 } catch (MusicServiceException | MusicQueryException ex) {
460 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
461 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause(), ex);
462 return new ReturnType(ResultType.FAILURE, ex.getMessage());
465 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
467 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
475 * @param primaryKeyValue
480 public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
481 PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
482 long start = System.currentTimeMillis();
484 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
485 lockId.substring(lockId.lastIndexOf("$")+1));
486 if(result.getResult().equals(ResultType.FAILURE))
487 return result;//not top of the lock store q
489 if (conditionInfo != null)
491 if (conditionInfo.testCondition() == false)
492 return new ReturnType(ResultType.FAILURE,
493 "Lock acquired but the condition is not true");
494 } catch (Exception e) {
495 logger.error(EELFLoggerDelegate.errorLogger, e);
496 return new ReturnType(ResultType.FAILURE,
497 "Exception thrown while checking the condition, check its sanctity:\n"
501 String query = queryObject.getQuery();
502 long timeOfWrite = System.currentTimeMillis();
503 long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$")+1));
504 long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
505 // TODO: use Statement instead of modifying query
506 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
507 if (queryObject.getOperation().equalsIgnoreCase("delete"))
508 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
509 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
510 query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
512 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
514 queryObject.replaceQueryString(query);
515 MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
516 dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
517 long end = System.currentTimeMillis();
518 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
519 }catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
520 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), e);
521 return new ReturnType(ResultType.FAILURE,
522 "Exception thrown while doing the critical put\n"
525 return new ReturnType(ResultType.SUCCESS, "Update performed");
533 * @return Boolean Indicates success or failure
534 * @throws MusicServiceException
538 public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException {
539 // this is mainly for some functions like keyspace creation etc which does not
540 // really need the bells and whistles of Music locking.
541 boolean result = false;
543 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
544 } catch (MusicQueryException | MusicServiceException ex) {
545 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
546 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR, ex);
547 throw new MusicServiceException(ex.getMessage());
549 return result ? ResultType.SUCCESS : ResultType.FAILURE;
553 * This method performs DDL operation on cassandra.
555 * @param queryObject query object containing prepared query and values
557 * @throws MusicServiceException
559 public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
560 ResultSet results = null;
562 results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
563 } catch (MusicQueryException | MusicServiceException e) {
564 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), e);
565 throw new MusicServiceException(e.getMessage());
571 * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
572 * is used to check if the resource is free.
574 * @param keyspace name of the keyspace
575 * @param table name of the table
576 * @param primaryKeyValue primary key value
577 * @param queryObject query object containing prepared query and values
578 * @param lockId lock ID to check if the resource is free to perform the operation.
581 public ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
582 PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
583 ResultSet results = null;
586 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
587 lockId.substring(lockId.lastIndexOf("$")+1));
588 if(result.getResult().equals(ResultType.FAILURE))
589 return null;//not top of the lock store q
590 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
591 } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
592 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity
593 .WARN, ErrorTypes.MUSICSERVICEERROR, e);
599 * This method performs DML operation on cassandra, when the lock of the dd is acquired.
601 * @param keyspaceName name of the keyspace
602 * @param tableName name of the table
603 * @param primaryKey primary key value
604 * @param queryObject query object containing prepared query and values
606 * @throws MusicLockingException
607 * @throws MusicServiceException
608 * @throws MusicQueryException
610 public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
611 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException, MusicQueryException, MusicServiceException {
612 long start = System.currentTimeMillis();
613 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
614 String lockId = createLockReference(fullyQualifiedKey);
615 long lockCreationTime = System.currentTimeMillis();
616 ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockId);
617 long lockAcqTime = System.currentTimeMillis();
619 if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
620 logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
621 voluntaryReleaseLock(fullyQualifiedKey,lockId);
622 return lockAcqResult;
625 logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
626 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
627 ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
628 queryObject, lockRef, conditionInfo);
629 long criticalPutTime = System.currentTimeMillis();
630 voluntaryReleaseLock(fullyQualifiedKey,lockId);
631 long lockDeleteTime = System.currentTimeMillis();
632 String timingInfo = "|lock creation time:" + (lockCreationTime - start)
633 + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
634 + "|critical put time:" + (criticalPutTime - lockAcqTime)
635 + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
636 criticalPutResult.setTimingInfo(timingInfo);
637 return criticalPutResult;
644 * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
646 * @param keyspaceName name of the keyspace
647 * @param tableName name of the table
648 * @param primaryKey primary key value
649 * @param queryObject query object containing prepared query and values
651 * @throws MusicServiceException
652 * @throws MusicLockingException
653 * @throws MusicQueryException
655 public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
656 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
657 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
658 String lockId = createLockReference(fullyQualifiedKey);
659 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
660 ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockId);
661 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
662 logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
663 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
665 criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
666 voluntaryReleaseLock(fullyQualifiedKey,lockId);
669 voluntaryReleaseLock(fullyQualifiedKey,lockId);
670 logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
681 public Map<String, Object> validateLock(String lockName) {
682 return MusicUtil.validateLock(lockName);
686 public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
687 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
692 public List<String> getLockQueue(String fullyQualifiedKey)
693 throws MusicServiceException, MusicQueryException, MusicLockingException {
694 String[] splitString = fullyQualifiedKey.split("\\.");
695 String keyspace = splitString[0];
696 String table = splitString[1];
697 String primaryKeyValue = splitString[2];
699 return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
702 public long getLockQueueSize(String fullyQualifiedKey)
703 throws MusicServiceException, MusicQueryException, MusicLockingException {
704 String[] splitString = fullyQualifiedKey.split("\\.");
705 String keyspace = splitString[0];
706 String table = splitString[1];
707 String primaryKeyValue = splitString[2];
709 return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
712 public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
713 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {