2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * Modifications Copyright (c) 2018 IBM.
7 * ===================================================================
8 * Modifications Copyright (c) 2019 Samsung
9 * ===================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
22 * ============LICENSE_END=============================================
23 * ====================================================================
26 package org.onap.music.service.impl;
28 import java.io.StringWriter;
29 import java.util.List;
31 import java.util.StringTokenizer;
32 import java.util.concurrent.TimeUnit;
34 import org.onap.music.datastore.MusicDataStore;
35 import org.onap.music.datastore.MusicDataStoreHandle;
36 import org.onap.music.datastore.PreparedQueryObject;
37 import org.onap.music.eelf.logging.EELFLoggerDelegate;
38 import org.onap.music.eelf.logging.format.AppMessages;
39 import org.onap.music.eelf.logging.format.ErrorSeverity;
40 import org.onap.music.eelf.logging.format.ErrorTypes;
41 import org.onap.music.exceptions.MusicLockingException;
42 import org.onap.music.exceptions.MusicQueryException;
43 import org.onap.music.exceptions.MusicServiceException;
44 import org.onap.music.lockingservice.cassandra.CassaLockStore;
45 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
46 import org.onap.music.lockingservice.cassandra.LockType;
47 import org.onap.music.lockingservice.cassandra.MusicLockState;
48 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
49 import org.onap.music.main.MusicUtil;
50 import org.onap.music.main.ResultType;
51 import org.onap.music.main.ReturnType;
52 import org.onap.music.service.MusicCoreService;
54 import com.att.eelf.configuration.EELFLogger;
55 import com.datastax.driver.core.DataType;
56 import com.datastax.driver.core.ResultSet;
57 import com.datastax.driver.core.Row;
58 import com.datastax.driver.core.TableMetadata;
60 import org.onap.music.datastore.*;
62 public class MusicCassaCore implements MusicCoreService {
64 public static CassaLockStore mLockHandle = null;;
65 private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
66 private static boolean unitTestRun=true;
67 private static MusicCassaCore musicCassaCoreInstance = null;
69 private MusicCassaCore() {
72 public static MusicCassaCore getInstance() {
74 if(musicCassaCoreInstance == null) {
75 musicCassaCoreInstance = new MusicCassaCore();
77 return musicCassaCoreInstance;
80 public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
81 logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
82 long start = System.currentTimeMillis();
84 if (mLockHandle == null) {
86 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
87 } catch (Exception e) {
88 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
89 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
92 long end = System.currentTimeMillis();
93 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
97 public String createLockReference(String fullyQualifiedKey) throws MusicLockingException {
98 return createLockReference(fullyQualifiedKey, LockType.WRITE);
101 public String createLockReference(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
102 String[] splitString = fullyQualifiedKey.split("\\.");
103 String keyspace = splitString[0];
104 String table = splitString[1];
105 String lockName = splitString[2];
107 logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
108 long start = System.currentTimeMillis();
109 String lockReference = null;
112 lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype);
113 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
115 throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
116 } catch (Exception e) {
117 logger.error(EELFLoggerDelegate.applicationLogger, e);
118 throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
120 long end = System.currentTimeMillis();
121 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
122 return lockReference;
126 public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod)
127 throws MusicLockingException, MusicQueryException, MusicServiceException {
128 evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
129 return acquireLock(fullyQualifiedKey, lockReference);
132 private void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod)
133 throws MusicLockingException, MusicQueryException, MusicServiceException {
134 String[] splitString = fullyQualifiedKey.split("\\.");
135 String keyspace = splitString[0];
136 String table = splitString[1];
137 String primaryKeyValue = splitString[2];
139 LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
141 if (!currentLockHolderObject.getIsLockOwner()) { // no lock holder
145 * Release the lock of the previous holder if it has expired. if the update to the acquire time has
146 * not reached due to network delays, simply use the create time as the reference
148 long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.getAcquireTime()),
149 Long.parseLong(currentLockHolderObject.getCreateTime()));
150 if ((System.currentTimeMillis() - referenceTime) > leasePeriod) {
151 forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.getLockRef() + "");
152 logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.getLockRef() + " forcibly released");
156 public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
157 throws MusicLockingException, MusicQueryException, MusicServiceException {
158 String[] splitString = lockId.split("\\.");
159 String keyspace = splitString[0].substring(1);//remove '$'
160 String table = splitString[1];
161 String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
162 String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
163 String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
165 LockObject lockInfo = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue, lockRef);
167 if (!lockInfo.getIsLockOwner()) {
168 return new ReturnType(ResultType.FAILURE, lockId + " is not a lock holder");//not top of the lock store q
171 //check to see if the value of the key has to be synced in case there was a forceful release
172 String syncTable = keyspace+".unsyncedKeys_"+table;
173 String query = "select * from "+syncTable+" where key='"+localFullyQualifiedKey+"';";
174 PreparedQueryObject readQueryObject = new PreparedQueryObject();
175 readQueryObject.appendQueryString(query);
176 ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
177 if (results.all().size() != 0) {
178 logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
180 syncQuorum(keyspace, table, primaryKeyValue);
181 } catch (Exception e) {
182 StringWriter sw = new StringWriter();
183 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",
184 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
185 String exceptionAsString = sw.toString();
186 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
188 String cleanQuery = "delete from " + syncTable + " where key='"+localFullyQualifiedKey+"';";
189 PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
190 deleteQueryObject.appendQueryString(cleanQuery);
191 MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
194 getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
196 return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
203 * @param tableQueryObject
205 * @return Boolean Indicates success or failure
206 * @throws MusicServiceException
210 public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject,
211 String consistency) throws MusicServiceException {
212 boolean result = false;
215 // create shadow locking table
216 result = getLockingServiceHandle().createLockQueue(keyspace, table);
218 return ResultType.FAILURE;
222 // create table to track unsynced_keys
223 table = "unsyncedKeys_" + table;
226 "CREATE TABLE IF NOT EXISTS " + keyspace + "." + table + " ( key text,PRIMARY KEY (key) );";
227 PreparedQueryObject queryObject = new PreparedQueryObject();
229 queryObject.appendQueryString(tabQuery);
231 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
233 // create actual table
234 result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
235 } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
236 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR, ErrorSeverity.WARN,
237 ErrorTypes.MUSICSERVICEERROR);
238 throw new MusicServiceException(ex.getMessage());
240 return result ? ResultType.SUCCESS : ResultType.FAILURE;
243 private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
244 logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
245 PreparedQueryObject selectQuery = new PreparedQueryObject();
246 PreparedQueryObject updateQuery = new PreparedQueryObject();
248 // get the primary key d
249 TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
250 String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName(); // we only support single
252 DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
253 Object cqlFormattedPrimaryKeyValue =
254 MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
256 // get the row of data from a quorum
257 selectQuery.appendQueryString("SELECT * FROM " + keyspace + "." + table + " WHERE "
258 + primaryKeyName + "= ?" + ";");
259 selectQuery.addValue(cqlFormattedPrimaryKeyValue);
260 MusicUtil.writeBackToQuorum(selectQuery, primaryKeyName, updateQuery, keyspace, table,
261 cqlFormattedPrimaryKeyValue);
269 public ResultSet quorumGet(PreparedQueryObject query) {
270 ResultSet results = null;
272 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
273 } catch (MusicServiceException | MusicQueryException e) {
274 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR,
275 ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR, e);
281 public String whoseTurnIsIt(String fullyQualifiedKey) {
282 String[] splitString = fullyQualifiedKey.split("\\.");
283 String keyspace = splitString[0];
284 String table = splitString[1];
285 String primaryKeyValue = splitString[2];
287 LockObject lockOwner = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
288 if (!lockOwner.getIsLockOwner()) {
291 return "$" + fullyQualifiedKey + "$" + lockOwner.getLockRef();
292 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
293 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.LOCKINGERROR + fullyQualifiedKey,
294 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
299 public List<String> getCurrentLockHolders(String fullyQualifiedKey) {
300 String[] splitString = fullyQualifiedKey.split("\\.");
301 String keyspace = splitString[0];
302 String table = splitString[1];
303 String primaryKeyValue = splitString[2];
305 return getLockingServiceHandle().getCurrentLockHolders(keyspace, table, primaryKeyValue);
306 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
307 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
314 * @param lockReference
317 public static String getLockNameFromId(String lockReference) {
318 StringTokenizer st = new StringTokenizer(lockReference);
319 return st.nextToken("$");
323 public void destroyLockRef(String lockId) throws MusicLockingException {
324 long start = System.currentTimeMillis();
325 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
326 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
327 String[] splitString = fullyQualifiedKey.split("\\.");
328 String keyspace = splitString[0];
329 String table = splitString[1];
330 String primaryKeyValue = splitString[2];
332 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef,MusicUtil.getRetryCount());
333 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
334 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef,
335 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
336 throw new MusicLockingException(e.getMessage());
338 long end = System.currentTimeMillis();
339 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
342 public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
343 long start = System.currentTimeMillis();
344 String[] splitString = fullyQualifiedKey.split("\\.");
345 String keyspace = splitString[0];
346 String table = splitString[1];
347 String primaryKeyValue = splitString[2];
349 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference,MusicUtil.getRetryCount());
350 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
351 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK + lockReference,
352 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR,e);
353 throw new MusicLockingException(e.getMessage());
355 long end = System.currentTimeMillis();
356 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
357 return new MusicLockState(LockStatus.UNLOCKED, "");
361 public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) throws MusicLockingException {
362 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
363 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
364 if (voluntaryRelease) {
365 return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
367 return forciblyReleaseLock(fullyQualifiedKey, lockRef);
371 public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference)
372 throws MusicLockingException {
373 MusicLockState result = null;
375 result = destroyLockRef(fullyQualifiedKey, lockReference);
376 } catch (Exception ex) {
377 logger.info(EELFLoggerDelegate.applicationLogger,
378 "Exception in voluntaryReleaseLock() for " + fullyQualifiedKey + "ref: " + lockReference);
379 throw new MusicLockingException(ex.getMessage());
384 public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
385 String[] splitString = fullyQualifiedKey.split("\\.");
386 String keyspace = splitString[0];
387 String table = splitString[1];
389 //leave a signal that this key could potentially be unsynchronized
390 String syncTable = keyspace+".unsyncedKeys_"+table;
391 PreparedQueryObject queryObject = new PreparedQueryObject();
392 String values = "(?)";
393 queryObject.addValue(fullyQualifiedKey);
394 String insQuery = "insert into "+syncTable+" (key) values "+values+";";
395 queryObject.appendQueryString(insQuery);
397 MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
398 } catch (Exception e) {
399 logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
400 + e.getMessage(), e);
403 //now release the lock
404 return destroyLockRef(fullyQualifiedKey, lockReference);
410 * @throws MusicLockingException
413 public void deleteLock(String lockName) throws MusicLockingException {
414 throw new MusicLockingException("Depreciated Method Delete Lock");
417 // Prepared Query Additions.
423 * @throws MusicServiceException
425 public ReturnType eventualPut(PreparedQueryObject queryObject) {
426 boolean result = false;
428 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
429 } catch (MusicServiceException | MusicQueryException ex) {
430 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
431 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
432 return new ReturnType(ResultType.FAILURE, ex.getMessage());
435 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
437 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
445 * @throws MusicServiceException
447 public ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
448 boolean result = false;
450 PreparedQueryObject getGaurd = new PreparedQueryObject();
451 getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
452 getGaurd.addValue(primaryKey);
454 ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
455 Row row = getGaurdResult.one();
457 guard = row.getLong("guard");
458 long timeOfWrite = System.currentTimeMillis();
459 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
460 String query = queryObject.getQuery();
461 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
462 if (queryObject.getOperation().equalsIgnoreCase("delete"))
463 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
465 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
467 queryObject.replaceQueryString(query);
470 } catch (MusicServiceException | MusicQueryException e) {
471 logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(), e);
474 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
475 } catch (MusicServiceException | MusicQueryException ex) {
476 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(),"[ERR512E] Failed to get Lock Handle ",
477 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
478 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " ", ex);
479 return new ReturnType(ResultType.FAILURE, ex.getMessage());
482 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
484 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
492 * @param primaryKeyValue
497 public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
498 PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
499 long start = System.currentTimeMillis();
501 String keyLock = lockId.substring(lockId.lastIndexOf(".") + 1,lockId.lastIndexOf("$"));
502 if (lockId.contains(".") && !keyLock.equals(primaryKeyValue)) {
503 return new ReturnType(ResultType.FAILURE,"Lock value '" + keyLock + "' and key value '"
504 + primaryKeyValue + "' not match. Please check your values: "
507 LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
508 lockId.substring(lockId.lastIndexOf("$") + 1));
510 if ( lockObject == null ) {
511 return new ReturnType(ResultType.FAILURE, lockId + " does not exist.");
512 } else if (!lockObject.getIsLockOwner()) {
513 return new ReturnType(ResultType.FAILURE, lockId + " is not the lock holder");
514 } else if (lockObject.getLocktype() != LockType.WRITE) {
515 return new ReturnType(ResultType.FAILURE,
516 "Attempting to do write operation, but " + lockId + " is a read lock");
519 if (conditionInfo != null) {
521 if (conditionInfo.testCondition() == false)
522 return new ReturnType(ResultType.FAILURE, "Lock acquired but the condition is not true");
523 } catch (Exception e) {
524 logger.error(EELFLoggerDelegate.errorLogger, e);
525 return new ReturnType(ResultType.FAILURE,
526 "Exception thrown while checking the condition, check its sanctity:\n" + e.getMessage());
529 String query = queryObject.getQuery();
530 long timeOfWrite = System.currentTimeMillis();
531 long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$") + 1));
532 long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
533 // TODO: use Statement instead of modifying query
534 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
535 if (queryObject.getOperation().equalsIgnoreCase("delete"))
536 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
537 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
538 query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
540 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
542 queryObject.replaceQueryString(query);
543 MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
544 dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
545 long end = System.currentTimeMillis();
546 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
547 } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
548 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), e);
549 return new ReturnType(ResultType.FAILURE,
550 "Exception thrown while doing the critical put: "
553 return new ReturnType(ResultType.SUCCESS, "Update performed");
561 * @return Boolean Indicates success or failure
562 * @throws MusicServiceException
566 public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException,MusicQueryException {
567 // this is mainly for some functions like keyspace creation etc which does not
568 // really need the bells and whistles of Music locking.
569 boolean result = false;
571 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
572 // } catch (MusicQueryException | MusicServiceException ex) {
573 // logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
574 // ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR, ex);
575 // throw new MusicServiceException(ex.getMessage(),ex);
577 return result ? ResultType.SUCCESS : ResultType.FAILURE;
581 * This method performs DDL operation on cassandra.
583 * @param queryObject query object containing prepared query and values
585 * @throws MusicServiceException
587 public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
588 ResultSet results = null;
590 results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
591 } catch (MusicQueryException | MusicServiceException e) {
592 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
593 throw new MusicServiceException(e.getMessage());
599 * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
600 * is used to check if the resource is free.
602 * @param keyspace name of the keyspace
603 * @param table name of the table
604 * @param primaryKeyValue primary key value
605 * @param queryObject query object containing prepared query and values
606 * @param lockId lock ID to check if the resource is free to perform the operation.
609 public ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
610 PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
611 ResultSet results = null;
612 String keyLock = lockId.substring(lockId.lastIndexOf(".") + 1,lockId.lastIndexOf("$"));
614 if (lockId.contains(".") && !keyLock.equals(primaryKeyValue)) {
615 throw new MusicLockingException("Lock value '" + keyLock + "' and key value '"
616 + primaryKeyValue + "' do not match. Please check your values: "
619 LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
620 lockId.substring(lockId.lastIndexOf("$") + 1));
621 if (null == lockObject) {
622 throw new MusicLockingException("No Lock Object. Please check if lock name or key is correct."
625 if ( !lockObject.getIsLockOwner()) {
626 return null;// not top of the lock store q
628 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
629 } catch ( MusicLockingException e ) {
630 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity
631 .WARN, ErrorTypes.MUSICSERVICEERROR);
632 throw new MusicServiceException(
633 "Cannot perform critical get for key: " + primaryKeyValue + " : " + e.getMessage());
634 } catch (MusicQueryException | MusicServiceException e) {
635 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity
636 .WARN, ErrorTypes.MUSICSERVICEERROR, e);
637 throw new MusicServiceException(
638 "Cannot perform critical get for key: " + primaryKeyValue + " : " + e.getMessage());
644 * This method performs DML operation on cassandra, when the lock of the dd is acquired.
646 * @param keyspaceName name of the keyspace
647 * @param tableName name of the table
648 * @param primaryKey primary key value
649 * @param queryObject query object containing prepared query and values
651 * @throws MusicLockingException
652 * @throws MusicServiceException
653 * @throws MusicQueryException
655 public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
656 PreparedQueryObject queryObject, Condition conditionInfo)
657 throws MusicLockingException, MusicQueryException, MusicServiceException {
658 long start = System.currentTimeMillis();
659 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
660 String lockId = createLockReference(fullyQualifiedKey, LockType.WRITE);
661 long lockCreationTime = System.currentTimeMillis();
662 ReturnType lockAcqResult = null;
663 logger.info(EELFLoggerDelegate.applicationLogger,
664 "***Acquiring lock for atomicPut() query : " + queryObject.getQuery() + " : " + primaryKey);
665 logger.info(EELFLoggerDelegate.applicationLogger,
666 "***Acquiring lock for atomicPut() values: " + queryObject.getValues().toString());
667 if (conditionInfo != null) {
668 logger.info(EELFLoggerDelegate.applicationLogger,
669 "***Acquiring lock for atomicPut() conditions: " + conditionInfo.toString());
672 lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
673 } catch (MusicLockingException ex) {
674 logger.error(EELFLoggerDelegate.errorLogger,
675 "Exception while acquireLockWithLease() in atomic put for key: " + primaryKey);
676 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
677 throw new MusicServiceException(
678 "Cannot perform atomic put for key: " + primaryKey + " : " + ex.getMessage());
680 long lockAcqTime = System.currentTimeMillis();
683 * if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) { logger.info(EELFLoggerDelegate.
684 * applicationLogger,"unable to acquire lock, id " + lockId);
685 * voluntaryReleaseLock(fullyQualifiedKey,lockId); return lockAcqResult; }
688 logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
689 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
690 ReturnType criticalPutResult = null;
691 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
692 criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, queryObject, lockRef, conditionInfo);
693 long criticalPutTime = System.currentTimeMillis();
694 long lockDeleteTime = System.currentTimeMillis();
695 String timingInfo = "|lock creation time:" + (lockCreationTime - start) + "|lock accquire time:"
696 + (lockAcqTime - lockCreationTime) + "|critical put time:" + (criticalPutTime - lockAcqTime)
697 + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
698 criticalPutResult.setTimingInfo(timingInfo);
700 logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
701 criticalPutResult = lockAcqResult;
704 voluntaryReleaseLock(fullyQualifiedKey, lockId);
705 } catch (MusicLockingException ex) {
706 logger.info(EELFLoggerDelegate.applicationLogger,
707 "Exception occured while deleting lock after atomic put for key: " + primaryKey);
708 criticalPutResult.setMessage(criticalPutResult.getMessage() + "Lock release failed");
710 return criticalPutResult;
716 * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
718 * @param keyspaceName name of the keyspace
719 * @param tableName name of the table
720 * @param primaryKey primary key value
721 * @param queryObject query object containing prepared query and values
723 * @throws MusicServiceException
724 * @throws MusicLockingException
725 * @throws MusicQueryException
727 public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
728 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
729 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
730 String lockId = createLockReference(fullyQualifiedKey, LockType.READ);
731 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
732 ReturnType lockAcqResult = null;
733 ResultSet result = null;
734 logger.info(EELFLoggerDelegate.applicationLogger, "Acquiring lock for atomicGet() : " + queryObject.getQuery());
736 lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
737 } catch (MusicLockingException ex) {
738 logger.error(EELFLoggerDelegate.errorLogger,
739 "Exception while acquireLockWithLease() in atomic get for key: " + primaryKey);
740 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
741 throw new MusicServiceException(
742 "Cannot perform atomic get for key: " + primaryKey + " : " + ex.getMessage());
744 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
745 logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
746 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
747 result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
749 logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
752 voluntaryReleaseLock(fullyQualifiedKey, lockId);
753 } catch (MusicLockingException ex) {
754 logger.info(EELFLoggerDelegate.applicationLogger,
755 "Exception occured while deleting lock after atomic put for key: " + primaryKey);
756 throw new MusicLockingException(ex.getMessage());
768 public Map<String, Object> validateLock(String lockName) {
769 return MusicUtil.validateLock(lockName);
774 public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
775 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
780 public List<String> getLockQueue(String fullyQualifiedKey)
781 throws MusicServiceException, MusicQueryException, MusicLockingException {
782 String[] splitString = fullyQualifiedKey.split("\\.");
783 String keyspace = splitString[0];
784 String table = splitString[1];
785 String primaryKeyValue = splitString[2];
787 return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
790 public long getLockQueueSize(String fullyQualifiedKey)
791 throws MusicServiceException, MusicQueryException, MusicLockingException {
792 String[] splitString = fullyQualifiedKey.split("\\.");
793 String keyspace = splitString[0];
794 String table = splitString[1];
795 String primaryKeyValue = splitString[2];
797 return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
802 public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
803 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {