2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * Modifications Copyright (c) 2018 IBM.
7 * ===================================================================
8 * Modifications Copyright (c) 2019 Samsung
9 * ===================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
22 * ============LICENSE_END=============================================
23 * ====================================================================
26 package org.onap.music.service.impl;
28 import java.io.StringWriter;
29 import java.util.List;
31 import java.util.StringTokenizer;
32 import java.util.concurrent.TimeUnit;
34 import org.onap.music.datastore.MusicDataStore;
35 import org.onap.music.datastore.MusicDataStoreHandle;
36 import org.onap.music.datastore.PreparedQueryObject;
37 import org.onap.music.eelf.logging.EELFLoggerDelegate;
38 import org.onap.music.eelf.logging.format.AppMessages;
39 import org.onap.music.eelf.logging.format.ErrorSeverity;
40 import org.onap.music.eelf.logging.format.ErrorTypes;
41 import org.onap.music.exceptions.MusicLockingException;
42 import org.onap.music.exceptions.MusicQueryException;
43 import org.onap.music.exceptions.MusicServiceException;
44 import org.onap.music.lockingservice.cassandra.CassaLockStore;
45 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
46 import org.onap.music.lockingservice.cassandra.MusicLockState;
47 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
48 import org.onap.music.main.MusicUtil;
49 import org.onap.music.main.ResultType;
50 import org.onap.music.main.ReturnType;
51 import org.onap.music.service.MusicCoreService;
53 import com.att.eelf.configuration.EELFLogger;
54 import com.datastax.driver.core.DataType;
55 import com.datastax.driver.core.ResultSet;
56 import com.datastax.driver.core.Row;
57 import com.datastax.driver.core.TableMetadata;
59 import org.onap.music.datastore.*;
61 public class MusicCassaCore implements MusicCoreService {
63 public static CassaLockStore mLockHandle = null;;
64 private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
65 private static boolean unitTestRun=true;
66 private static MusicCassaCore musicCassaCoreInstance = null;
68 private MusicCassaCore() {
71 public static MusicCassaCore getInstance() {
73 if(musicCassaCoreInstance == null) {
74 musicCassaCoreInstance = new MusicCassaCore();
76 return musicCassaCoreInstance;
79 public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
80 logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
81 long start = System.currentTimeMillis();
83 if (mLockHandle == null) {
85 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
86 } catch (Exception e) {
87 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
88 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
91 long end = System.currentTimeMillis();
92 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
98 public String createLockReference(String fullyQualifiedKey) throws MusicLockingException {
99 String[] splitString = fullyQualifiedKey.split("\\.");
100 String keyspace = splitString[0];
101 String table = splitString[1];
102 String lockName = splitString[2];
104 logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
105 long start = System.currentTimeMillis();
106 String lockReference = null;
109 lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName);
110 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
112 throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
113 } catch (Exception e) {
114 logger.error(EELFLoggerDelegate.applicationLogger, e);
115 throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
117 long end = System.currentTimeMillis();
118 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
119 return lockReference;
123 public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
124 evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
125 return acquireLock(fullyQualifiedKey, lockReference);
128 private void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
130 String[] splitString = fullyQualifiedKey.split("\\.");
131 String keyspace = splitString[0];
132 String table = splitString[1];
133 String primaryKeyValue = splitString[2];
135 LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
137 if (currentLockHolderObject==null) { //no lock holder
140 /* Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the
142 long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.acquireTime), Long.parseLong(currentLockHolderObject.createTime));
143 if((System.currentTimeMillis() - referenceTime) > leasePeriod) {
144 forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.lockRef+"");
145 logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.lockRef+" forcibly released");
149 private static ReturnType isTopOfLockStore(String keyspace, String table, String primaryKeyValue, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException {
150 //return failure to lock holders too early or already evicted from the lock store
151 LockObject topOfLockStore = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
152 if (topOfLockStore==null) {
153 logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not the lock holder yet");
154 return new ReturnType(ResultType.FAILURE, "No lock holder!");
157 String topOfLockStoreS = topOfLockStore.lockRef;
158 long topOfLockStoreL = Long.parseLong(topOfLockStoreS);
159 long lockReferenceL = Long.parseLong(lockReference);
161 if(lockReferenceL > topOfLockStoreL) {
162 logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not the lock holder yet");
163 return new ReturnType(ResultType.FAILURE, lockReference+" is not the lock holder yet");
166 if(lockReferenceL < topOfLockStoreL) {
167 logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is no longer/or was never in the lock store queue");
168 return new ReturnType(ResultType.FAILURE, lockReference+" is no longer/or was never in the lock store queue");
171 return new ReturnType(ResultType.SUCCESS, lockReference+" is top of lock store");
174 public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
175 throws MusicLockingException, MusicQueryException, MusicServiceException {
176 String[] splitString = lockId.split("\\.");
177 String keyspace = splitString[0].substring(1);//remove '$'
178 String table = splitString[1];
179 String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
180 String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
181 String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
183 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockRef);
185 if(result.getResult().equals(ResultType.FAILURE))
186 return result;//not top of the lock store q
188 //check to see if the value of the key has to be synced in case there was a forceful release
189 String syncTable = keyspace+".unsyncedKeys_"+table;
190 String query = "select * from "+syncTable+" where key='"+localFullyQualifiedKey+"';";
191 PreparedQueryObject readQueryObject = new PreparedQueryObject();
192 readQueryObject.appendQueryString(query);
193 ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
194 if (results.all().size() != 0) {
195 logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
197 syncQuorum(keyspace, table, primaryKeyValue);
198 } catch (Exception e) {
199 StringWriter sw = new StringWriter();
200 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",
201 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
202 String exceptionAsString = sw.toString();
203 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
205 String cleanQuery = "delete from " + syncTable + " where key='"+localFullyQualifiedKey+"';";
206 PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
207 deleteQueryObject.appendQueryString(cleanQuery);
208 MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
211 getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
213 return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
220 * @param tableQueryObject
222 * @return Boolean Indicates success or failure
223 * @throws MusicServiceException
227 public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject, String consistency) throws MusicServiceException {
228 boolean result = false;
231 //create shadow locking table
232 result = getLockingServiceHandle().createLockQueue(keyspace, table);
234 return ResultType.FAILURE;
238 //create table to track unsynced_keys
239 table = "unsyncedKeys_"+table;
241 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
242 + " ( key text,PRIMARY KEY (key) );";
243 PreparedQueryObject queryObject = new PreparedQueryObject();
245 queryObject.appendQueryString(tabQuery);
247 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
250 //create actual table
251 result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
252 } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
253 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity
254 .WARN, ErrorTypes.MUSICSERVICEERROR, ex);
255 throw new MusicServiceException(ex.getMessage());
257 return result?ResultType.SUCCESS:ResultType.FAILURE;
260 private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
261 logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
262 PreparedQueryObject selectQuery = new PreparedQueryObject();
263 PreparedQueryObject updateQuery = new PreparedQueryObject();
265 // get the primary key d
266 TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
267 String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName(); // we only support single
269 DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
270 Object cqlFormattedPrimaryKeyValue =
271 MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
273 // get the row of data from a quorum
274 selectQuery.appendQueryString("SELECT * FROM " + keyspace + "." + table + " WHERE "
275 + primaryKeyName + "= ?" + ";");
276 selectQuery.addValue(cqlFormattedPrimaryKeyValue);
277 MusicUtil.writeBackToQuorum(selectQuery, primaryKeyName, updateQuery, keyspace, table,
278 cqlFormattedPrimaryKeyValue);
287 public ResultSet quorumGet(PreparedQueryObject query) {
288 ResultSet results = null;
290 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
291 } catch (MusicServiceException | MusicQueryException e) {
292 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity
293 .MAJOR, ErrorTypes.GENERALSERVICEERROR, e);
302 * @param fullyQualifiedKey lockName
305 public String whoseTurnIsIt(String fullyQualifiedKey) {
306 String[] splitString = fullyQualifiedKey.split("\\.");
307 String keyspace = splitString[0];
308 String table = splitString[1];
309 String primaryKeyValue = splitString[2];
311 LockObject lockOwner = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
312 if (lockOwner==null) {
313 return "No lock holder!";
315 return "$" + fullyQualifiedKey + "$"
316 + getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
317 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
318 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey
319 ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
326 * @param lockReference
329 public static String getLockNameFromId(String lockReference) {
330 StringTokenizer st = new StringTokenizer(lockReference);
331 return st.nextToken("$");
335 public void destroyLockRef(String lockId) throws MusicLockingException {
336 long start = System.currentTimeMillis();
337 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
338 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
339 String[] splitString = fullyQualifiedKey.split("\\.");
340 String keyspace = splitString[0];
341 String table = splitString[1];
342 String primaryKeyValue = splitString[2];
344 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef,MusicUtil.getRetryCount());
345 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
346 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef,
347 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
348 throw new MusicLockingException(e.getMessage());
350 long end = System.currentTimeMillis();
351 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
354 public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
355 long start = System.currentTimeMillis();
356 String[] splitString = fullyQualifiedKey.split("\\.");
357 String keyspace = splitString[0];
358 String table = splitString[1];
359 String primaryKeyValue = splitString[2];
361 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference,MusicUtil.getRetryCount());
362 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
363 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK + lockReference,
364 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR,e);
365 throw new MusicLockingException(e.getMessage());
367 long end = System.currentTimeMillis();
368 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
369 return new MusicLockState(LockStatus.UNLOCKED, "");
373 public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) throws MusicLockingException {
374 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
375 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
376 if (voluntaryRelease) {
377 return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
379 return forciblyReleaseLock(fullyQualifiedKey, lockRef);
383 public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
384 MusicLockState result = null;
386 result = destroyLockRef(fullyQualifiedKey, lockReference);
388 catch(Exception ex) {
389 logger.info(EELFLoggerDelegate.applicationLogger,"Exception in voluntaryReleaseLock() for "+ fullyQualifiedKey + "ref: " + lockReference);
390 throw new MusicLockingException(ex.getMessage());
395 public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
396 String[] splitString = fullyQualifiedKey.split("\\.");
397 String keyspace = splitString[0];
398 String table = splitString[1];
400 //leave a signal that this key could potentially be unsynchronized
401 String syncTable = keyspace+".unsyncedKeys_"+table;
402 PreparedQueryObject queryObject = new PreparedQueryObject();
403 String values = "(?)";
404 queryObject.addValue(fullyQualifiedKey);
405 String insQuery = "insert into "+syncTable+" (key) values "+values+";";
406 queryObject.appendQueryString(insQuery);
408 MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
409 } catch (Exception e) {
410 logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
411 + e.getMessage(), e);
414 //now release the lock
415 return destroyLockRef(fullyQualifiedKey, lockReference);
421 * @throws MusicLockingException
424 public void deleteLock(String lockName) throws MusicLockingException {
425 throw new MusicLockingException("Depreciated Method Delete Lock");
428 // Prepared Query Additions.
434 * @throws MusicServiceException
436 public ReturnType eventualPut(PreparedQueryObject queryObject) {
437 boolean result = false;
439 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
440 } catch (MusicServiceException | MusicQueryException ex) {
441 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
442 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
443 return new ReturnType(ResultType.FAILURE, ex.getMessage());
446 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
448 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
456 * @throws MusicServiceException
458 public ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
459 boolean result = false;
461 PreparedQueryObject getGaurd = new PreparedQueryObject();
462 getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
463 getGaurd.addValue(primaryKey);
465 ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
466 Row row = getGaurdResult.one();
468 guard = row.getLong("guard");
469 long timeOfWrite = System.currentTimeMillis();
470 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
471 String query = queryObject.getQuery();
472 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
473 if (queryObject.getOperation().equalsIgnoreCase("delete"))
474 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
476 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
478 queryObject.replaceQueryString(query);
481 } catch (MusicServiceException | MusicQueryException e) {
482 logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(), e);
485 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
486 } catch (MusicServiceException | MusicQueryException ex) {
487 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(),"[ERR512E] Failed to get Lock Handle ",
488 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
489 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " ", ex);
490 return new ReturnType(ResultType.FAILURE, ex.getMessage());
493 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
495 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
503 * @param primaryKeyValue
508 public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
509 PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
510 long start = System.currentTimeMillis();
512 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
513 lockId.substring(lockId.lastIndexOf("$")+1));
514 if(result.getResult().equals(ResultType.FAILURE))
515 return result;//not top of the lock store q
517 if (conditionInfo != null)
519 if (conditionInfo.testCondition() == false)
520 return new ReturnType(ResultType.FAILURE,
521 "Lock acquired but the condition is not true");
522 } catch (Exception e) {
523 logger.error(EELFLoggerDelegate.errorLogger, e);
524 return new ReturnType(ResultType.FAILURE,
525 "Exception thrown while checking the condition, check its sanctity:\n"
528 String query = queryObject.getQuery();
529 long timeOfWrite = System.currentTimeMillis();
530 long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$")+1));
531 long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
532 // TODO: use Statement instead of modifying query
533 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
534 if (queryObject.getOperation().equalsIgnoreCase("delete"))
535 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
536 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
537 query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
539 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
541 queryObject.replaceQueryString(query);
542 MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
543 dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
544 long end = System.currentTimeMillis();
545 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
546 }catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
547 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), e);
548 return new ReturnType(ResultType.FAILURE,
549 "Exception thrown while doing the critical put\n"
552 return new ReturnType(ResultType.SUCCESS, "Update performed");
560 * @return Boolean Indicates success or failure
561 * @throws MusicServiceException
565 public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException {
566 // this is mainly for some functions like keyspace creation etc which does not
567 // really need the bells and whistles of Music locking.
568 boolean result = false;
570 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
571 } catch (MusicQueryException | MusicServiceException ex) {
572 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
573 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR, ex);
574 throw new MusicServiceException(ex.getMessage());
576 return result ? ResultType.SUCCESS : ResultType.FAILURE;
580 * This method performs DDL operation on cassandra.
582 * @param queryObject query object containing prepared query and values
584 * @throws MusicServiceException
586 public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
587 ResultSet results = null;
589 results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
590 } catch (MusicQueryException | MusicServiceException e) {
591 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), e);
592 throw new MusicServiceException(e.getMessage());
598 * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
599 * is used to check if the resource is free.
601 * @param keyspace name of the keyspace
602 * @param table name of the table
603 * @param primaryKeyValue primary key value
604 * @param queryObject query object containing prepared query and values
605 * @param lockId lock ID to check if the resource is free to perform the operation.
608 public ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
609 PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
610 ResultSet results = null;
613 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
614 lockId.substring(lockId.lastIndexOf("$")+1));
615 if(result.getResult().equals(ResultType.FAILURE))
616 return null;//not top of the lock store q
617 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
618 } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
619 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity
620 .WARN, ErrorTypes.MUSICSERVICEERROR, e);
626 * This method performs DML operation on cassandra, when the lock of the dd is acquired.
628 * @param keyspaceName name of the keyspace
629 * @param tableName name of the table
630 * @param primaryKey primary key value
631 * @param queryObject query object containing prepared query and values
633 * @throws MusicLockingException
634 * @throws MusicServiceException
635 * @throws MusicQueryException
637 public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
638 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException, MusicQueryException, MusicServiceException {
639 long start = System.currentTimeMillis();
640 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
641 String lockId = createLockReference(fullyQualifiedKey);
642 long lockCreationTime = System.currentTimeMillis();
643 ReturnType lockAcqResult = null;
644 logger.info(EELFLoggerDelegate.applicationLogger,"***Acquiring lock for atomicPut() query : " + queryObject.getQuery() + " : "+ primaryKey);
645 logger.info(EELFLoggerDelegate.applicationLogger,"***Acquiring lock for atomicPut() values: " + queryObject.getValues().toString());
646 if(conditionInfo!=null)
647 logger.info(EELFLoggerDelegate.applicationLogger,"***Acquiring lock for atomicPut() conditions: " + conditionInfo.toString());
649 lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId,MusicUtil.getDefaultLockLeasePeriod());
650 }catch(MusicLockingException ex) {
651 logger.error(EELFLoggerDelegate.errorLogger,"Exception while acquireLockWithLease() in atomic put for key: "+primaryKey);
652 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage());
653 throw new MusicServiceException("Cannot perform atomic put for key: "+primaryKey+ " : "+ ex.getMessage());
655 long lockAcqTime = System.currentTimeMillis();
658 * if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
659 * logger.info(EELFLoggerDelegate.
660 * applicationLogger,"unable to acquire lock, id " + lockId);
661 * voluntaryReleaseLock(fullyQualifiedKey,lockId); return lockAcqResult; }
664 logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
665 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
666 ReturnType criticalPutResult = null ;
667 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
668 criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, queryObject, lockRef,
670 long criticalPutTime = System.currentTimeMillis();
671 long lockDeleteTime = System.currentTimeMillis();
672 String timingInfo = "|lock creation time:" + (lockCreationTime - start) + "|lock accquire time:"
673 + (lockAcqTime - lockCreationTime) + "|critical put time:" + (criticalPutTime - lockAcqTime)
674 + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
675 criticalPutResult.setTimingInfo(timingInfo);
677 logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
678 criticalPutResult = lockAcqResult;
681 voluntaryReleaseLock(fullyQualifiedKey, lockId);
683 } catch (MusicLockingException ex) {
684 logger.info(EELFLoggerDelegate.applicationLogger,"Exception occured while deleting lock after atomic put for key: "+primaryKey);
685 criticalPutResult.setMessage(criticalPutResult.getMessage()+ "Lock release failed");
687 return criticalPutResult;
693 * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
695 * @param keyspaceName name of the keyspace
696 * @param tableName name of the table
697 * @param primaryKey primary key value
698 * @param queryObject query object containing prepared query and values
700 * @throws MusicServiceException
701 * @throws MusicLockingException
702 * @throws MusicQueryException
704 public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
705 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
706 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
707 String lockId = createLockReference(fullyQualifiedKey);
708 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
709 ReturnType lockAcqResult = null;
710 ResultSet result =null;
711 logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock for atomicGet() : " + queryObject.getQuery());
713 lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId,MusicUtil.getDefaultLockLeasePeriod());
714 }catch(MusicLockingException ex) {
715 logger.error(EELFLoggerDelegate.errorLogger,"Exception while acquireLockWithLease() in atomic get for key: "+primaryKey);
716 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage());
717 throw new MusicServiceException("Cannot perform atomic get for key: "+primaryKey+ " : "+ ex.getMessage());
719 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
720 logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
721 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
722 result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
724 logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
728 voluntaryReleaseLock(fullyQualifiedKey,lockId);
729 }catch(MusicLockingException ex) {
730 logger.info(EELFLoggerDelegate.applicationLogger,"Exception occured while deleting lock after atomic put for key: "+primaryKey);
731 throw new MusicLockingException(ex.getMessage());
743 public Map<String, Object> validateLock(String lockName) {
744 return MusicUtil.validateLock(lockName);
749 public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
750 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
755 public List<String> getLockQueue(String fullyQualifiedKey)
756 throws MusicServiceException, MusicQueryException, MusicLockingException {
757 String[] splitString = fullyQualifiedKey.split("\\.");
758 String keyspace = splitString[0];
759 String table = splitString[1];
760 String primaryKeyValue = splitString[2];
762 return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
765 public long getLockQueueSize(String fullyQualifiedKey)
766 throws MusicServiceException, MusicQueryException, MusicLockingException {
767 String[] splitString = fullyQualifiedKey.split("\\.");
768 String keyspace = splitString[0];
769 String table = splitString[1];
770 String primaryKeyValue = splitString[2];
772 return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
777 public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
778 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {