2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * Modifications Copyright (c) 2018 IBM.
7 * ===================================================================
8 * Modifications Copyright (c) 2019 Samsung
9 * ===================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
22 * ============LICENSE_END=============================================
23 * ====================================================================
26 package org.onap.music.service.impl;
28 import java.io.StringWriter;
29 import java.util.List;
31 import java.util.StringTokenizer;
32 import java.util.concurrent.TimeUnit;
34 import org.onap.music.datastore.MusicDataStore;
35 import org.onap.music.datastore.MusicDataStoreHandle;
36 import org.onap.music.datastore.PreparedQueryObject;
37 import org.onap.music.eelf.logging.EELFLoggerDelegate;
38 import org.onap.music.eelf.logging.format.AppMessages;
39 import org.onap.music.eelf.logging.format.ErrorSeverity;
40 import org.onap.music.eelf.logging.format.ErrorTypes;
41 import org.onap.music.exceptions.MusicLockingException;
42 import org.onap.music.exceptions.MusicQueryException;
43 import org.onap.music.exceptions.MusicServiceException;
44 import org.onap.music.lockingservice.cassandra.CassaLockStore;
45 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
46 import org.onap.music.lockingservice.cassandra.LockType;
47 import org.onap.music.lockingservice.cassandra.MusicLockState;
48 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
49 import org.onap.music.main.MusicUtil;
50 import org.onap.music.main.ResultType;
51 import org.onap.music.main.ReturnType;
52 import org.onap.music.service.MusicCoreService;
54 import com.att.eelf.configuration.EELFLogger;
55 import com.datastax.driver.core.DataType;
56 import com.datastax.driver.core.ResultSet;
57 import com.datastax.driver.core.Row;
58 import com.datastax.driver.core.TableMetadata;
60 import org.onap.music.datastore.*;
62 public class MusicCassaCore implements MusicCoreService {
64 public static CassaLockStore mLockHandle = null;;
65 private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
66 private static boolean unitTestRun=true;
67 private static MusicCassaCore musicCassaCoreInstance = null;
69 private MusicCassaCore() {
72 public static MusicCassaCore getInstance() {
74 if(musicCassaCoreInstance == null) {
75 musicCassaCoreInstance = new MusicCassaCore();
77 return musicCassaCoreInstance;
80 public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
81 logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
82 long start = System.currentTimeMillis();
84 if (mLockHandle == null) {
86 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
87 } catch (Exception e) {
88 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
89 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
92 long end = System.currentTimeMillis();
93 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
97 public String createLockReference(String fullyQualifiedKey) throws MusicLockingException {
98 return createLockReference(fullyQualifiedKey, LockType.WRITE);
101 public String createLockReference(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
102 String[] splitString = fullyQualifiedKey.split("\\.");
103 String keyspace = splitString[0];
104 String table = splitString[1];
105 String lockName = splitString[2];
107 logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
108 long start = System.currentTimeMillis();
109 String lockReference = null;
112 lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype);
113 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
115 throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
116 } catch (Exception e) {
117 logger.error(EELFLoggerDelegate.applicationLogger, e);
118 throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
120 long end = System.currentTimeMillis();
121 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
122 return lockReference;
126 public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod)
127 throws MusicLockingException, MusicQueryException, MusicServiceException {
128 evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
129 return acquireLock(fullyQualifiedKey, lockReference);
132 private void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod)
133 throws MusicLockingException, MusicQueryException, MusicServiceException {
134 String[] splitString = fullyQualifiedKey.split("\\.");
135 String keyspace = splitString[0];
136 String table = splitString[1];
137 String primaryKeyValue = splitString[2];
139 LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
141 if (!currentLockHolderObject.getIsLockOwner()) { // no lock holder
145 * Release the lock of the previous holder if it has expired. if the update to the acquire time has
146 * not reached due to network delays, simply use the create time as the reference
148 long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.getAcquireTime()),
149 Long.parseLong(currentLockHolderObject.getCreateTime()));
150 if ((System.currentTimeMillis() - referenceTime) > leasePeriod) {
151 forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.getLockRef() + "");
152 logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.getLockRef() + " forcibly released");
156 public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
157 throws MusicLockingException, MusicQueryException, MusicServiceException {
158 String[] splitString = lockId.split("\\.");
159 String keyspace = splitString[0].substring(1);//remove '$'
160 String table = splitString[1];
161 String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
162 String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
163 String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
165 LockObject lockInfo = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue, lockRef);
167 if (!lockInfo.getIsLockOwner()) {
168 return new ReturnType(ResultType.FAILURE, lockId + " is not a lock holder");//not top of the lock store q
171 //check to see if the value of the key has to be synced in case there was a forceful release
172 String syncTable = keyspace+".unsyncedKeys_"+table;
173 String query = "select * from "+syncTable+" where key='"+localFullyQualifiedKey+"';";
174 PreparedQueryObject readQueryObject = new PreparedQueryObject();
175 readQueryObject.appendQueryString(query);
176 ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
177 if (results.all().size() != 0) {
178 logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
180 syncQuorum(keyspace, table, primaryKeyValue);
181 } catch (Exception e) {
182 StringWriter sw = new StringWriter();
183 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",
184 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
185 String exceptionAsString = sw.toString();
186 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
188 String cleanQuery = "delete from " + syncTable + " where key='"+localFullyQualifiedKey+"';";
189 PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
190 deleteQueryObject.appendQueryString(cleanQuery);
191 MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
194 getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
196 return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
203 * @param tableQueryObject
205 * @return Boolean Indicates success or failure
206 * @throws MusicServiceException
210 public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject,
211 String consistency) throws MusicServiceException {
212 boolean result = false;
215 // create shadow locking table
216 result = getLockingServiceHandle().createLockQueue(keyspace, table);
218 return ResultType.FAILURE;
222 // create table to track unsynced_keys
223 table = "unsyncedKeys_" + table;
226 "CREATE TABLE IF NOT EXISTS " + keyspace + "." + table + " ( key text,PRIMARY KEY (key) );";
227 PreparedQueryObject queryObject = new PreparedQueryObject();
229 queryObject.appendQueryString(tabQuery);
231 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
233 // create actual table
234 result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
235 } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
236 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR, ErrorSeverity.WARN,
237 ErrorTypes.MUSICSERVICEERROR);
238 throw new MusicServiceException(ex.getMessage());
240 return result ? ResultType.SUCCESS : ResultType.FAILURE;
243 private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
244 logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
245 PreparedQueryObject selectQuery = new PreparedQueryObject();
246 PreparedQueryObject updateQuery = new PreparedQueryObject();
248 // get the primary key d
249 TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
250 String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName(); // we only support single
252 DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
253 Object cqlFormattedPrimaryKeyValue =
254 MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
256 // get the row of data from a quorum
257 selectQuery.appendQueryString("SELECT * FROM " + keyspace + "." + table + " WHERE "
258 + primaryKeyName + "= ?" + ";");
259 selectQuery.addValue(cqlFormattedPrimaryKeyValue);
260 MusicUtil.writeBackToQuorum(selectQuery, primaryKeyName, updateQuery, keyspace, table,
261 cqlFormattedPrimaryKeyValue);
269 public ResultSet quorumGet(PreparedQueryObject query) {
270 ResultSet results = null;
272 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
273 } catch (MusicServiceException | MusicQueryException e) {
274 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR,
275 ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR, e);
281 public String whoseTurnIsIt(String fullyQualifiedKey) {
282 String[] splitString = fullyQualifiedKey.split("\\.");
283 String keyspace = splitString[0];
284 String table = splitString[1];
285 String primaryKeyValue = splitString[2];
287 LockObject lockOwner = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
288 if (!lockOwner.getIsLockOwner()) {
289 return "No lock holder!";
291 return "$" + fullyQualifiedKey + "$" + lockOwner.getLockRef();
292 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
293 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.LOCKINGERROR + fullyQualifiedKey,
294 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
299 public List<String> getCurrentLockHolders(String fullyQualifiedKey) {
300 String[] splitString = fullyQualifiedKey.split("\\.");
301 String keyspace = splitString[0];
302 String table = splitString[1];
303 String primaryKeyValue = splitString[2];
305 return getLockingServiceHandle().getCurrentLockHolders(keyspace, table, primaryKeyValue);
306 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
307 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
314 * @param lockReference
317 public static String getLockNameFromId(String lockReference) {
318 StringTokenizer st = new StringTokenizer(lockReference);
319 return st.nextToken("$");
323 public void destroyLockRef(String lockId) throws MusicLockingException {
324 long start = System.currentTimeMillis();
325 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
326 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
327 String[] splitString = fullyQualifiedKey.split("\\.");
328 String keyspace = splitString[0];
329 String table = splitString[1];
330 String primaryKeyValue = splitString[2];
332 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef,MusicUtil.getRetryCount());
333 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
334 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef,
335 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
336 throw new MusicLockingException(e.getMessage());
338 long end = System.currentTimeMillis();
339 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
342 public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
343 long start = System.currentTimeMillis();
344 String[] splitString = fullyQualifiedKey.split("\\.");
345 String keyspace = splitString[0];
346 String table = splitString[1];
347 String primaryKeyValue = splitString[2];
349 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference,MusicUtil.getRetryCount());
350 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
351 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK + lockReference,
352 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR,e);
353 throw new MusicLockingException(e.getMessage());
355 long end = System.currentTimeMillis();
356 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
357 return new MusicLockState(LockStatus.UNLOCKED, "");
361 public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) throws MusicLockingException {
362 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
363 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
364 if (voluntaryRelease) {
365 return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
367 return forciblyReleaseLock(fullyQualifiedKey, lockRef);
371 public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference)
372 throws MusicLockingException {
373 MusicLockState result = null;
375 result = destroyLockRef(fullyQualifiedKey, lockReference);
376 } catch (Exception ex) {
377 logger.info(EELFLoggerDelegate.applicationLogger,
378 "Exception in voluntaryReleaseLock() for " + fullyQualifiedKey + "ref: " + lockReference);
379 throw new MusicLockingException(ex.getMessage());
384 public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
385 String[] splitString = fullyQualifiedKey.split("\\.");
386 String keyspace = splitString[0];
387 String table = splitString[1];
389 //leave a signal that this key could potentially be unsynchronized
390 String syncTable = keyspace+".unsyncedKeys_"+table;
391 PreparedQueryObject queryObject = new PreparedQueryObject();
392 String values = "(?)";
393 queryObject.addValue(fullyQualifiedKey);
394 String insQuery = "insert into "+syncTable+" (key) values "+values+";";
395 queryObject.appendQueryString(insQuery);
397 MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
398 } catch (Exception e) {
399 logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
400 + e.getMessage(), e);
403 //now release the lock
404 return destroyLockRef(fullyQualifiedKey, lockReference);
410 * @throws MusicLockingException
413 public void deleteLock(String lockName) throws MusicLockingException {
414 throw new MusicLockingException("Depreciated Method Delete Lock");
417 // Prepared Query Additions.
423 * @throws MusicServiceException
425 public ReturnType eventualPut(PreparedQueryObject queryObject) {
426 boolean result = false;
428 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
429 } catch (MusicServiceException | MusicQueryException ex) {
430 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
431 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
432 return new ReturnType(ResultType.FAILURE, ex.getMessage());
435 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
437 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
445 * @throws MusicServiceException
447 public ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
448 boolean result = false;
450 PreparedQueryObject getGaurd = new PreparedQueryObject();
451 getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
452 getGaurd.addValue(primaryKey);
454 ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
455 Row row = getGaurdResult.one();
457 guard = row.getLong("guard");
458 long timeOfWrite = System.currentTimeMillis();
459 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
460 String query = queryObject.getQuery();
461 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
462 if (queryObject.getOperation().equalsIgnoreCase("delete"))
463 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
465 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
467 queryObject.replaceQueryString(query);
470 } catch (MusicServiceException | MusicQueryException e) {
471 logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(), e);
474 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
475 } catch (MusicServiceException | MusicQueryException ex) {
476 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(),"[ERR512E] Failed to get Lock Handle ",
477 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
478 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " ", ex);
479 return new ReturnType(ResultType.FAILURE, ex.getMessage());
482 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
484 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
492 * @param primaryKeyValue
497 public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
498 PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
499 long start = System.currentTimeMillis();
501 LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
502 lockId.substring(lockId.lastIndexOf("$") + 1));
504 if (!lockObject.getIsLockOwner()) {
505 return new ReturnType(ResultType.FAILURE, lockId + " is not the lock holder");
506 } else if (lockObject.getLocktype() != LockType.WRITE) {
507 return new ReturnType(ResultType.FAILURE,
508 "Attempting to do write operation, but " + lockId + " is a write lock");
511 if (conditionInfo != null) {
513 if (conditionInfo.testCondition() == false)
514 return new ReturnType(ResultType.FAILURE, "Lock acquired but the condition is not true");
515 } catch (Exception e) {
516 logger.error(EELFLoggerDelegate.errorLogger, e);
517 return new ReturnType(ResultType.FAILURE,
518 "Exception thrown while checking the condition, check its sanctity:\n" + e.getMessage());
521 String query = queryObject.getQuery();
522 long timeOfWrite = System.currentTimeMillis();
523 long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$") + 1));
524 long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
525 // TODO: use Statement instead of modifying query
526 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
527 if (queryObject.getOperation().equalsIgnoreCase("delete"))
528 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
529 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
530 query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
532 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
534 queryObject.replaceQueryString(query);
535 MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
536 dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
537 long end = System.currentTimeMillis();
538 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
539 }catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
540 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), e);
541 return new ReturnType(ResultType.FAILURE,
542 "Exception thrown while doing the critical put\n"
545 return new ReturnType(ResultType.SUCCESS, "Update performed");
553 * @return Boolean Indicates success or failure
554 * @throws MusicServiceException
558 public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException {
559 // this is mainly for some functions like keyspace creation etc which does not
560 // really need the bells and whistles of Music locking.
561 boolean result = false;
563 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
564 } catch (MusicQueryException | MusicServiceException ex) {
565 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
566 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR, ex);
567 throw new MusicServiceException(ex.getMessage());
569 return result ? ResultType.SUCCESS : ResultType.FAILURE;
573 * This method performs DDL operation on cassandra.
575 * @param queryObject query object containing prepared query and values
577 * @throws MusicServiceException
579 public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
580 ResultSet results = null;
582 results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
583 } catch (MusicQueryException | MusicServiceException e) {
584 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
585 throw new MusicServiceException(e.getMessage());
591 * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
592 * is used to check if the resource is free.
594 * @param keyspace name of the keyspace
595 * @param table name of the table
596 * @param primaryKeyValue primary key value
597 * @param queryObject query object containing prepared query and values
598 * @param lockId lock ID to check if the resource is free to perform the operation.
601 public ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
602 PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
603 ResultSet results = null;
606 LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
607 lockId.substring(lockId.lastIndexOf("$") + 1));
608 if (!lockObject.getIsLockOwner()) {
609 return null;// not top of the lock store q
611 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
612 } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
613 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity
614 .WARN, ErrorTypes.MUSICSERVICEERROR, e);
620 * This method performs DML operation on cassandra, when the lock of the dd is acquired.
622 * @param keyspaceName name of the keyspace
623 * @param tableName name of the table
624 * @param primaryKey primary key value
625 * @param queryObject query object containing prepared query and values
627 * @throws MusicLockingException
628 * @throws MusicServiceException
629 * @throws MusicQueryException
631 public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
632 PreparedQueryObject queryObject, Condition conditionInfo)
633 throws MusicLockingException, MusicQueryException, MusicServiceException {
634 long start = System.currentTimeMillis();
635 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
636 String lockId = createLockReference(fullyQualifiedKey, LockType.WRITE);
637 long lockCreationTime = System.currentTimeMillis();
638 ReturnType lockAcqResult = null;
639 logger.info(EELFLoggerDelegate.applicationLogger,
640 "***Acquiring lock for atomicPut() query : " + queryObject.getQuery() + " : " + primaryKey);
641 logger.info(EELFLoggerDelegate.applicationLogger,
642 "***Acquiring lock for atomicPut() values: " + queryObject.getValues().toString());
643 if (conditionInfo != null) {
644 logger.info(EELFLoggerDelegate.applicationLogger,
645 "***Acquiring lock for atomicPut() conditions: " + conditionInfo.toString());
648 lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
649 } catch (MusicLockingException ex) {
650 logger.error(EELFLoggerDelegate.errorLogger,
651 "Exception while acquireLockWithLease() in atomic put for key: " + primaryKey);
652 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
653 throw new MusicServiceException(
654 "Cannot perform atomic put for key: " + primaryKey + " : " + ex.getMessage());
656 long lockAcqTime = System.currentTimeMillis();
659 * if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) { logger.info(EELFLoggerDelegate.
660 * applicationLogger,"unable to acquire lock, id " + lockId);
661 * voluntaryReleaseLock(fullyQualifiedKey,lockId); return lockAcqResult; }
664 logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
665 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
666 ReturnType criticalPutResult = null;
667 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
668 criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, queryObject, lockRef, conditionInfo);
669 long criticalPutTime = System.currentTimeMillis();
670 long lockDeleteTime = System.currentTimeMillis();
671 String timingInfo = "|lock creation time:" + (lockCreationTime - start) + "|lock accquire time:"
672 + (lockAcqTime - lockCreationTime) + "|critical put time:" + (criticalPutTime - lockAcqTime)
673 + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
674 criticalPutResult.setTimingInfo(timingInfo);
676 logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
677 criticalPutResult = lockAcqResult;
680 voluntaryReleaseLock(fullyQualifiedKey, lockId);
681 } catch (MusicLockingException ex) {
682 logger.info(EELFLoggerDelegate.applicationLogger,
683 "Exception occured while deleting lock after atomic put for key: " + primaryKey);
684 criticalPutResult.setMessage(criticalPutResult.getMessage() + "Lock release failed");
686 return criticalPutResult;
692 * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
694 * @param keyspaceName name of the keyspace
695 * @param tableName name of the table
696 * @param primaryKey primary key value
697 * @param queryObject query object containing prepared query and values
699 * @throws MusicServiceException
700 * @throws MusicLockingException
701 * @throws MusicQueryException
703 public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
704 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
705 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
706 String lockId = createLockReference(fullyQualifiedKey, LockType.READ);
707 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
708 ReturnType lockAcqResult = null;
709 ResultSet result = null;
710 logger.info(EELFLoggerDelegate.applicationLogger, "Acquiring lock for atomicGet() : " + queryObject.getQuery());
712 lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
713 } catch (MusicLockingException ex) {
714 logger.error(EELFLoggerDelegate.errorLogger,
715 "Exception while acquireLockWithLease() in atomic get for key: " + primaryKey);
716 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
717 throw new MusicServiceException(
718 "Cannot perform atomic get for key: " + primaryKey + " : " + ex.getMessage());
720 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
721 logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
722 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
723 result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
725 logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
728 voluntaryReleaseLock(fullyQualifiedKey, lockId);
729 } catch (MusicLockingException ex) {
730 logger.info(EELFLoggerDelegate.applicationLogger,
731 "Exception occured while deleting lock after atomic put for key: " + primaryKey);
732 throw new MusicLockingException(ex.getMessage());
744 public Map<String, Object> validateLock(String lockName) {
745 return MusicUtil.validateLock(lockName);
750 public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
751 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
756 public List<String> getLockQueue(String fullyQualifiedKey)
757 throws MusicServiceException, MusicQueryException, MusicLockingException {
758 String[] splitString = fullyQualifiedKey.split("\\.");
759 String keyspace = splitString[0];
760 String table = splitString[1];
761 String primaryKeyValue = splitString[2];
763 return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
766 public long getLockQueueSize(String fullyQualifiedKey)
767 throws MusicServiceException, MusicQueryException, MusicLockingException {
768 String[] splitString = fullyQualifiedKey.split("\\.");
769 String keyspace = splitString[0];
770 String table = splitString[1];
771 String primaryKeyValue = splitString[2];
773 return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
778 public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
779 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {