2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * Modifications Copyright (c) 2018 IBM.
7 * ===================================================================
8 * Modifications Copyright (c) 2019 Samsung
9 * ===================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
22 * ============LICENSE_END=============================================
23 * ====================================================================
26 package org.onap.music.service.impl;
28 import java.io.StringWriter;
29 import java.util.List;
31 import java.util.StringTokenizer;
33 import org.onap.music.datastore.MusicDataStore;
34 import org.onap.music.datastore.MusicDataStoreHandle;
35 import org.onap.music.datastore.PreparedQueryObject;
36 import org.onap.music.eelf.logging.EELFLoggerDelegate;
37 import org.onap.music.eelf.logging.format.AppMessages;
38 import org.onap.music.eelf.logging.format.ErrorSeverity;
39 import org.onap.music.eelf.logging.format.ErrorTypes;
40 import org.onap.music.exceptions.MusicLockingException;
41 import org.onap.music.exceptions.MusicQueryException;
42 import org.onap.music.exceptions.MusicServiceException;
43 import org.onap.music.lockingservice.cassandra.CassaLockStore;
44 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
45 import org.onap.music.lockingservice.cassandra.LockType;
46 import org.onap.music.lockingservice.cassandra.MusicLockState;
47 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
48 import org.onap.music.main.MusicUtil;
49 import org.onap.music.main.ResultType;
50 import org.onap.music.main.ReturnType;
51 import org.onap.music.service.MusicCoreService;
53 import com.datastax.driver.core.DataType;
54 import com.datastax.driver.core.ResultSet;
55 import com.datastax.driver.core.Row;
56 import com.datastax.driver.core.TableMetadata;
58 import org.onap.music.datastore.*;
60 public class MusicCassaCore implements MusicCoreService {
62 private static CassaLockStore mLockHandle = null;
63 private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
64 private static MusicCassaCore musicCassaCoreInstance = null;
66 private MusicCassaCore() {
67 // not going to happen
70 public static CassaLockStore getmLockHandle() {
74 public static void setmLockHandle(CassaLockStore mLockHandle) {
75 MusicCassaCore.mLockHandle = mLockHandle;
78 public static MusicCassaCore getInstance() {
80 if(musicCassaCoreInstance == null) {
81 musicCassaCoreInstance = new MusicCassaCore();
83 return musicCassaCoreInstance;
89 public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
90 logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
91 long start = System.currentTimeMillis();
93 if (mLockHandle == null) {
95 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
96 } catch (Exception e) {
97 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
98 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
101 long end = System.currentTimeMillis();
102 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
106 public String createLockReference(String fullyQualifiedKey) throws MusicLockingException {
107 return createLockReference(fullyQualifiedKey, LockType.WRITE);
110 public String createLockReference(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
111 String[] splitString = fullyQualifiedKey.split("\\.");
112 String keyspace = splitString[0];
113 String table = splitString[1];
114 String lockName = splitString[2];
116 logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
117 long start = System.currentTimeMillis();
118 String lockReference = null;
121 lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype);
122 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
124 throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
125 } catch (Exception e) {
126 logger.error(EELFLoggerDelegate.applicationLogger, e);
127 throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
129 long end = System.currentTimeMillis();
130 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
131 return lockReference;
135 public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod)
136 throws MusicLockingException, MusicQueryException, MusicServiceException {
137 evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
138 return acquireLock(fullyQualifiedKey, lockReference);
141 private void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod)
142 throws MusicLockingException, MusicQueryException, MusicServiceException {
143 String[] splitString = fullyQualifiedKey.split("\\.");
144 String keyspace = splitString[0];
145 String table = splitString[1];
146 String primaryKeyValue = splitString[2];
148 LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
150 if (!currentLockHolderObject.getIsLockOwner()) { // no lock holder
154 * Release the lock of the previous holder if it has expired. if the update to the acquire time has
155 * not reached due to network delays, simply use the create time as the reference
157 long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.getAcquireTime()),
158 Long.parseLong(currentLockHolderObject.getCreateTime()));
159 if ((System.currentTimeMillis() - referenceTime) > leasePeriod) {
160 forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.getLockRef() + "");
161 logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.getLockRef() + " forcibly released");
165 public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
166 throws MusicLockingException, MusicQueryException, MusicServiceException {
167 String[] splitString = lockId.split("\\.");
168 String keyspace = splitString[0].substring(1);//remove '$'
169 String table = splitString[1];
170 String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
171 String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
172 String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
174 LockObject lockInfo = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue, lockRef);
176 if (!lockInfo.getIsLockOwner()) {
177 return new ReturnType(ResultType.FAILURE, lockId + " is not a lock holder");//not top of the lock store q
180 //check to see if the value of the key has to be synced in case there was a forceful release
181 String syncTable = keyspace+".unsyncedKeys_"+table;
182 String query = "select * from "+syncTable+" where key='"+localFullyQualifiedKey+"';";
183 PreparedQueryObject readQueryObject = new PreparedQueryObject();
184 readQueryObject.appendQueryString(query);
185 ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
186 if (results.all().size() != 0) {
187 logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
189 syncQuorum(keyspace, table, primaryKeyValue);
190 } catch (Exception e) {
191 StringWriter sw = new StringWriter();
192 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",
193 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
194 String exceptionAsString = sw.toString();
195 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
197 String cleanQuery = "delete from " + syncTable + " where key='"+localFullyQualifiedKey+"';";
198 PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
199 deleteQueryObject.appendQueryString(cleanQuery);
200 MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
203 getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
205 return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
212 * @param tableQueryObject
214 * @return Boolean Indicates success or failure
215 * @throws MusicServiceException
219 public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject,
220 String consistency) throws MusicServiceException {
221 boolean result = false;
224 // create shadow locking table
225 result = getLockingServiceHandle().createLockQueue(keyspace, table);
227 return ResultType.FAILURE;
231 // create table to track unsynced_keys
232 table = "unsyncedKeys_" + table;
235 "CREATE TABLE IF NOT EXISTS " + keyspace + "." + table + " ( key text,PRIMARY KEY (key) );";
236 PreparedQueryObject queryObject = new PreparedQueryObject();
238 queryObject.appendQueryString(tabQuery);
240 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
242 // create actual table
243 result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
244 } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
245 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR, ErrorSeverity.WARN,
246 ErrorTypes.MUSICSERVICEERROR);
247 throw new MusicServiceException(ex.getMessage());
249 return result ? ResultType.SUCCESS : ResultType.FAILURE;
252 private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
253 logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
254 PreparedQueryObject selectQuery = new PreparedQueryObject();
255 PreparedQueryObject updateQuery = new PreparedQueryObject();
257 // get the primary key d
258 TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
259 String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName(); // we only support single
261 DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
262 Object cqlFormattedPrimaryKeyValue =
263 MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
265 // get the row of data from a quorum
266 selectQuery.appendQueryString("SELECT * FROM " + keyspace + "." + table + " WHERE "
267 + primaryKeyName + "= ?" + ";");
268 selectQuery.addValue(cqlFormattedPrimaryKeyValue);
269 MusicUtil.writeBackToQuorum(selectQuery, primaryKeyName, updateQuery, keyspace, table,
270 cqlFormattedPrimaryKeyValue);
278 public ResultSet quorumGet(PreparedQueryObject query) {
279 ResultSet results = null;
281 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
282 } catch (MusicServiceException | MusicQueryException e) {
283 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR,
284 ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR, e);
290 public String whoseTurnIsIt(String fullyQualifiedKey) {
291 String[] splitString = fullyQualifiedKey.split("\\.");
292 String keyspace = splitString[0];
293 String table = splitString[1];
294 String primaryKeyValue = splitString[2];
296 LockObject lockOwner = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
297 if (!lockOwner.getIsLockOwner()) {
300 return "$" + fullyQualifiedKey + "$" + lockOwner.getLockRef();
301 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
302 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.LOCKINGERROR + fullyQualifiedKey,
303 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
308 public List<String> getCurrentLockHolders(String fullyQualifiedKey) {
309 String[] splitString = fullyQualifiedKey.split("\\.");
310 String keyspace = splitString[0];
311 String table = splitString[1];
312 String primaryKeyValue = splitString[2];
314 return getLockingServiceHandle().getCurrentLockHolders(keyspace, table, primaryKeyValue);
315 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
316 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
323 * @param lockReference
326 public static String getLockNameFromId(String lockReference) {
327 StringTokenizer st = new StringTokenizer(lockReference);
328 return st.nextToken("$");
332 public void destroyLockRef(String lockId) throws MusicLockingException {
333 long start = System.currentTimeMillis();
334 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
335 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
336 String[] splitString = fullyQualifiedKey.split("\\.");
337 String keyspace = splitString[0];
338 String table = splitString[1];
339 String primaryKeyValue = splitString[2];
341 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef,MusicUtil.getRetryCount());
342 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
343 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef,
344 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
345 throw new MusicLockingException(e.getMessage());
347 long end = System.currentTimeMillis();
348 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
351 public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
352 long start = System.currentTimeMillis();
353 String[] splitString = fullyQualifiedKey.split("\\.");
354 String keyspace = splitString[0];
355 String table = splitString[1];
356 String primaryKeyValue = splitString[2];
358 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference,MusicUtil.getRetryCount());
359 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
360 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK + lockReference,
361 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR,e);
362 throw new MusicLockingException(e.getMessage());
364 long end = System.currentTimeMillis();
365 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
366 return new MusicLockState(LockStatus.UNLOCKED, "");
370 public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) throws MusicLockingException {
371 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
372 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
373 if (voluntaryRelease) {
374 return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
376 return forciblyReleaseLock(fullyQualifiedKey, lockRef);
380 public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference)
381 throws MusicLockingException {
382 MusicLockState result = null;
384 result = destroyLockRef(fullyQualifiedKey, lockReference);
385 } catch (Exception ex) {
386 logger.info(EELFLoggerDelegate.applicationLogger,
387 "Exception in voluntaryReleaseLock() for " + fullyQualifiedKey + "ref: " + lockReference);
388 throw new MusicLockingException(ex.getMessage());
393 public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
394 String[] splitString = fullyQualifiedKey.split("\\.");
395 String keyspace = splitString[0];
396 String table = splitString[1];
398 //leave a signal that this key could potentially be unsynchronized
399 String syncTable = keyspace+".unsyncedKeys_"+table;
400 PreparedQueryObject queryObject = new PreparedQueryObject();
401 String values = "(?)";
402 queryObject.addValue(fullyQualifiedKey);
403 String insQuery = "insert into "+syncTable+" (key) values "+values+";";
404 queryObject.appendQueryString(insQuery);
406 MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
407 } catch (Exception e) {
408 logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
409 + e.getMessage(), e);
412 //now release the lock
413 return destroyLockRef(fullyQualifiedKey, lockReference);
419 * @throws MusicLockingException
422 public void deleteLock(String lockName) throws MusicLockingException {
423 throw new MusicLockingException("Depreciated Method Delete Lock");
426 // Prepared Query Additions.
432 * @throws MusicServiceException
434 public ReturnType eventualPut(PreparedQueryObject queryObject) {
435 boolean result = false;
437 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
438 } catch (MusicServiceException | MusicQueryException ex) {
439 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
440 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
441 return new ReturnType(ResultType.FAILURE, ex.getMessage());
444 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
446 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
454 * @throws MusicServiceException
456 public ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
457 boolean result = false;
459 PreparedQueryObject getGaurd = new PreparedQueryObject();
460 getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
461 getGaurd.addValue(primaryKey);
463 ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
464 Row row = getGaurdResult.one();
466 guard = row.getLong("guard");
467 long timeOfWrite = System.currentTimeMillis();
468 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
469 String query = queryObject.getQuery();
470 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
471 if (queryObject.getOperation().equalsIgnoreCase("delete"))
472 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
474 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
476 queryObject.replaceQueryString(query);
479 } catch (MusicServiceException | MusicQueryException e) {
480 logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(), e);
483 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
484 } catch (MusicServiceException | MusicQueryException ex) {
485 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(),"[ERR512E] Failed to get Lock Handle ",
486 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
487 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " ", ex);
488 return new ReturnType(ResultType.FAILURE, ex.getMessage());
491 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
493 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
501 * @param primaryKeyValue
506 public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
507 PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
508 long start = System.currentTimeMillis();
510 String keyLock = lockId.substring(lockId.lastIndexOf(".") + 1,lockId.lastIndexOf("$"));
511 if (lockId.contains(".") && !keyLock.equals(primaryKeyValue)) {
512 return new ReturnType(ResultType.FAILURE,"Lock value '" + keyLock + "' and key value '"
513 + primaryKeyValue + "' not match. Please check your values: "
516 LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
517 lockId.substring(lockId.lastIndexOf("$") + 1));
519 if ( lockObject == null ) {
520 return new ReturnType(ResultType.FAILURE, lockId + " does not exist.");
521 } else if (!lockObject.getIsLockOwner()) {
522 return new ReturnType(ResultType.FAILURE, lockId + " is not the lock holder");
523 } else if (lockObject.getLocktype() != LockType.WRITE) {
524 return new ReturnType(ResultType.FAILURE,
525 "Attempting to do write operation, but " + lockId + " is a read lock");
528 if (conditionInfo != null) {
530 if (conditionInfo.testCondition() == false)
531 return new ReturnType(ResultType.FAILURE, "Lock acquired but the condition is not true");
532 } catch (Exception e) {
533 logger.error(EELFLoggerDelegate.errorLogger, e);
534 return new ReturnType(ResultType.FAILURE,
535 "Exception thrown while checking the condition, check its sanctity:\n" + e.getMessage());
538 String query = queryObject.getQuery();
539 long timeOfWrite = System.currentTimeMillis();
540 long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$") + 1));
541 long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
542 // TODO: use Statement instead of modifying query
543 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
544 if (queryObject.getOperation().equalsIgnoreCase("delete"))
545 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
546 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
547 query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
549 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
551 queryObject.replaceQueryString(query);
552 MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
553 dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
554 long end = System.currentTimeMillis();
555 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
556 } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
557 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), e);
558 return new ReturnType(ResultType.FAILURE,
559 "Exception thrown while doing the critical put: "
562 return new ReturnType(ResultType.SUCCESS, "Update performed");
570 * @return Boolean Indicates success or failure
571 * @throws MusicServiceException
575 public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException,MusicQueryException {
576 // this is mainly for some functions like keyspace creation etc which does not
577 // really need the bells and whistles of Music locking.
578 boolean result = false;
580 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
581 // } catch (MusicQueryException | MusicServiceException ex) {
582 // logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
583 // ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR, ex);
584 // throw new MusicServiceException(ex.getMessage(),ex);
586 return result ? ResultType.SUCCESS : ResultType.FAILURE;
590 * This method performs DDL operation on cassandra.
592 * @param queryObject query object containing prepared query and values
594 * @throws MusicServiceException
596 public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
597 ResultSet results = null;
599 results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
600 } catch (MusicQueryException | MusicServiceException e) {
601 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
602 throw new MusicServiceException(e.getMessage());
608 * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
609 * is used to check if the resource is free.
611 * @param keyspace name of the keyspace
612 * @param table name of the table
613 * @param primaryKeyValue primary key value
614 * @param queryObject query object containing prepared query and values
615 * @param lockId lock ID to check if the resource is free to perform the operation.
618 public ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
619 PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
620 ResultSet results = null;
621 String keyLock = lockId.substring(lockId.lastIndexOf(".") + 1,lockId.lastIndexOf("$"));
623 if (lockId.contains(".") && !keyLock.equals(primaryKeyValue)) {
624 throw new MusicLockingException("Lock value '" + keyLock + "' and key value '"
625 + primaryKeyValue + "' do not match. Please check your values: "
628 LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
629 lockId.substring(lockId.lastIndexOf("$") + 1));
630 if (null == lockObject) {
631 throw new MusicLockingException("No Lock Object. Please check if lock name or key is correct."
634 if ( !lockObject.getIsLockOwner()) {
635 return null;// not top of the lock store q
637 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
638 } catch ( MusicLockingException e ) {
639 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity
640 .WARN, ErrorTypes.MUSICSERVICEERROR);
641 throw new MusicServiceException(
642 "Cannot perform critical get for key: " + primaryKeyValue + " : " + e.getMessage());
643 } catch (MusicQueryException | MusicServiceException e) {
644 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity
645 .WARN, ErrorTypes.MUSICSERVICEERROR, e);
646 throw new MusicServiceException(
647 "Cannot perform critical get for key: " + primaryKeyValue + " : " + e.getMessage());
653 * This method performs DML operation on cassandra, when the lock of the dd is acquired.
655 * @param keyspaceName name of the keyspace
656 * @param tableName name of the table
657 * @param primaryKey primary key value
658 * @param queryObject query object containing prepared query and values
660 * @throws MusicLockingException
661 * @throws MusicServiceException
662 * @throws MusicQueryException
664 public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
665 PreparedQueryObject queryObject, Condition conditionInfo)
666 throws MusicLockingException, MusicQueryException, MusicServiceException {
667 long start = System.currentTimeMillis();
668 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
669 String lockId = createLockReference(fullyQualifiedKey, LockType.WRITE);
670 long lockCreationTime = System.currentTimeMillis();
671 ReturnType lockAcqResult = null;
672 logger.info(EELFLoggerDelegate.applicationLogger,
673 "***Acquiring lock for atomicPut() query : " + queryObject.getQuery() + " : " + primaryKey);
674 logger.info(EELFLoggerDelegate.applicationLogger,
675 "***Acquiring lock for atomicPut() values: " + queryObject.getValues().toString());
676 if (conditionInfo != null) {
677 logger.info(EELFLoggerDelegate.applicationLogger,
678 "***Acquiring lock for atomicPut() conditions: " + conditionInfo.toString());
681 lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
682 } catch (MusicLockingException ex) {
683 logger.error(EELFLoggerDelegate.errorLogger,
684 "Exception while acquireLockWithLease() in atomic put for key: " + primaryKey);
685 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
686 throw new MusicServiceException(
687 "Cannot perform atomic put for key: " + primaryKey + " : " + ex.getMessage());
689 long lockAcqTime = System.currentTimeMillis();
692 * if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) { logger.info(EELFLoggerDelegate.
693 * applicationLogger,"unable to acquire lock, id " + lockId);
694 * voluntaryReleaseLock(fullyQualifiedKey,lockId); return lockAcqResult; }
697 logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
698 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
699 ReturnType criticalPutResult = null;
700 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
701 criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, queryObject, lockRef, conditionInfo);
702 long criticalPutTime = System.currentTimeMillis();
703 long lockDeleteTime = System.currentTimeMillis();
704 String timingInfo = "|lock creation time:" + (lockCreationTime - start) + "|lock accquire time:"
705 + (lockAcqTime - lockCreationTime) + "|critical put time:" + (criticalPutTime - lockAcqTime)
706 + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
707 criticalPutResult.setTimingInfo(timingInfo);
709 logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
710 criticalPutResult = lockAcqResult;
713 voluntaryReleaseLock(fullyQualifiedKey, lockId);
714 } catch (MusicLockingException ex) {
715 logger.info(EELFLoggerDelegate.applicationLogger,
716 "Exception occured while deleting lock after atomic put for key: " + primaryKey);
717 criticalPutResult.setMessage(criticalPutResult.getMessage() + "Lock release failed");
719 return criticalPutResult;
725 * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
727 * @param keyspaceName name of the keyspace
728 * @param tableName name of the table
729 * @param primaryKey primary key value
730 * @param queryObject query object containing prepared query and values
732 * @throws MusicServiceException
733 * @throws MusicLockingException
734 * @throws MusicQueryException
736 public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
737 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
738 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
739 String lockId = createLockReference(fullyQualifiedKey, LockType.READ);
740 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
741 ReturnType lockAcqResult = null;
742 ResultSet result = null;
743 logger.info(EELFLoggerDelegate.applicationLogger, "Acquiring lock for atomicGet() : " + queryObject.getQuery());
745 lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
746 } catch (MusicLockingException ex) {
747 logger.error(EELFLoggerDelegate.errorLogger,
748 "Exception while acquireLockWithLease() in atomic get for key: " + primaryKey);
749 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
750 throw new MusicServiceException(
751 "Cannot perform atomic get for key: " + primaryKey + " : " + ex.getMessage());
753 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
754 logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
755 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
756 result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
758 logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
761 voluntaryReleaseLock(fullyQualifiedKey, lockId);
762 } catch (MusicLockingException ex) {
763 logger.info(EELFLoggerDelegate.applicationLogger,
764 "Exception occured while deleting lock after atomic put for key: " + primaryKey);
765 throw new MusicLockingException(ex.getMessage());
777 public Map<String, Object> validateLock(String lockName) {
778 return MusicUtil.validateLock(lockName);
783 public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
784 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
789 public List<String> getLockQueue(String fullyQualifiedKey)
790 throws MusicServiceException, MusicQueryException, MusicLockingException {
791 String[] splitString = fullyQualifiedKey.split("\\.");
792 String keyspace = splitString[0];
793 String table = splitString[1];
794 String primaryKeyValue = splitString[2];
796 return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
799 public long getLockQueueSize(String fullyQualifiedKey)
800 throws MusicServiceException, MusicQueryException, MusicLockingException {
801 String[] splitString = fullyQualifiedKey.split("\\.");
802 String keyspace = splitString[0];
803 String table = splitString[1];
804 String primaryKeyValue = splitString[2];
806 return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
811 public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
812 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {