2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * Modifications Copyright (c) 2018 IBM.
7 * ===================================================================
8 * Modifications Copyright (c) 2019 Samsung
9 * ===================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
22 * ============LICENSE_END=============================================
23 * ====================================================================
26 package org.onap.music.service.impl;
28 import java.io.StringWriter;
29 import java.util.List;
31 import java.util.StringTokenizer;
33 import org.onap.music.datastore.MusicDataStore;
34 import org.onap.music.datastore.MusicDataStoreHandle;
35 import org.onap.music.datastore.PreparedQueryObject;
36 import org.onap.music.eelf.logging.EELFLoggerDelegate;
37 import org.onap.music.eelf.logging.format.AppMessages;
38 import org.onap.music.eelf.logging.format.ErrorSeverity;
39 import org.onap.music.eelf.logging.format.ErrorTypes;
40 import org.onap.music.exceptions.MusicLockingException;
41 import org.onap.music.exceptions.MusicQueryException;
42 import org.onap.music.exceptions.MusicServiceException;
43 import org.onap.music.lockingservice.cassandra.CassaLockStore;
44 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
45 import org.onap.music.lockingservice.cassandra.MusicLockState;
46 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
47 import org.onap.music.main.MusicUtil;
48 import org.onap.music.main.ResultType;
49 import org.onap.music.main.ReturnType;
50 import org.onap.music.service.MusicCoreService;
52 import com.datastax.driver.core.DataType;
53 import com.datastax.driver.core.ResultSet;
54 import com.datastax.driver.core.Row;
55 import com.datastax.driver.core.TableMetadata;
57 import org.onap.music.datastore.*;
59 public class MusicCassaCore implements MusicCoreService {
61 public static CassaLockStore mLockHandle = null;;
62 private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
63 private static boolean unitTestRun=true;
64 private static MusicCassaCore musicCassaCoreInstance = null;
66 private MusicCassaCore() {
69 public static MusicCassaCore getInstance() {
71 if(musicCassaCoreInstance == null) {
72 musicCassaCoreInstance = new MusicCassaCore();
74 return musicCassaCoreInstance;
77 public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
78 logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
79 long start = System.currentTimeMillis();
81 if (mLockHandle == null) {
83 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
84 } catch (Exception e) {
85 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
86 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
89 long end = System.currentTimeMillis();
90 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
96 public String createLockReference(String fullyQualifiedKey) {
97 String[] splitString = fullyQualifiedKey.split("\\.");
98 String keyspace = splitString[0];
99 String table = splitString[1];
100 String lockName = splitString[2];
102 logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
103 long start = System.currentTimeMillis();
104 String lockReference = null;
106 lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName);
107 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
110 long end = System.currentTimeMillis();
111 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
112 return lockReference;
116 public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
117 evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
118 return acquireLock(fullyQualifiedKey, lockReference);
121 private void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
123 String[] splitString = fullyQualifiedKey.split("\\.");
124 String keyspace = splitString[0];
125 String table = splitString[1];
126 String primaryKeyValue = splitString[2];
128 LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
130 /* Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the
133 long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.acquireTime), Long.parseLong(currentLockHolderObject.createTime));
134 if((System.currentTimeMillis() - referenceTime) > leasePeriod) {
135 forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.lockRef+"");
136 logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.lockRef+" forcibly released");
140 private static ReturnType isTopOfLockStore(String keyspace, String table, String primaryKeyValue, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException {
142 //return failure to lock holders too early or already evicted from the lock store
143 String topOfLockStoreS = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
144 long topOfLockStoreL = Long.parseLong(topOfLockStoreS);
145 long lockReferenceL = Long.parseLong(lockReference);
147 if(lockReferenceL > topOfLockStoreL) {
148 logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not the lock holder yet");
149 return new ReturnType(ResultType.FAILURE, lockReference+" is not the lock holder yet");
153 if(lockReferenceL < topOfLockStoreL) {
154 logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is no longer/or was never in the lock store queue");
155 return new ReturnType(ResultType.FAILURE, lockReference+" is no longer/or was never in the lock store queue");
158 return new ReturnType(ResultType.SUCCESS, lockReference+" is top of lock store");
161 public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
162 throws MusicLockingException, MusicQueryException, MusicServiceException {
163 String[] splitString = lockId.split("\\.");
164 String keyspace = splitString[0].substring(1);//remove '$'
165 String table = splitString[1];
166 String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
167 String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
168 String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
170 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockRef);
172 if(result.getResult().equals(ResultType.FAILURE))
173 return result;//not top of the lock store q
175 //check to see if the value of the key has to be synced in case there was a forceful release
176 String syncTable = keyspace+".unsyncedKeys_"+table;
177 String query = "select * from "+syncTable+" where key='"+localFullyQualifiedKey+"';";
178 PreparedQueryObject readQueryObject = new PreparedQueryObject();
179 readQueryObject.appendQueryString(query);
180 ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
181 if (results.all().size() != 0) {
182 logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
184 syncQuorum(keyspace, table, primaryKeyValue);
185 } catch (Exception e) {
186 StringWriter sw = new StringWriter();
187 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
188 String exceptionAsString = sw.toString();
189 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
191 String cleanQuery = "delete from music_internal.unsynced_keys where key='"+localFullyQualifiedKey+"';";
192 PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
193 deleteQueryObject.appendQueryString(cleanQuery);
194 MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
197 getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
199 return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
206 * @param tableQueryObject
208 * @return Boolean Indicates success or failure
209 * @throws MusicServiceException
213 public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject, String consistency) throws MusicServiceException {
214 boolean result = false;
217 //create shadow locking table
218 result = getLockingServiceHandle().createLockQueue(keyspace, table);
220 return ResultType.FAILURE;
224 //create table to track unsynced_keys
225 table = "unsyncedKeys_"+table;
227 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
228 + " ( key text,PRIMARY KEY (key) );";
229 PreparedQueryObject queryObject = new PreparedQueryObject();
231 queryObject.appendQueryString(tabQuery);
233 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
236 //create actual table
237 result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
238 } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
239 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
240 throw new MusicServiceException(ex.getMessage());
242 return result?ResultType.SUCCESS:ResultType.FAILURE;
245 private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
246 logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
247 PreparedQueryObject selectQuery = new PreparedQueryObject();
248 PreparedQueryObject updateQuery = new PreparedQueryObject();
250 // get the primary key d
251 TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
252 String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName();// we only support single
254 DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
255 Object cqlFormattedPrimaryKeyValue =
256 MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
258 // get the row of data from a quorum
259 selectQuery.appendQueryString("SELECT * FROM " + keyspace + "." + table + " WHERE "
260 + primaryKeyName + "= ?" + ";");
261 selectQuery.addValue(cqlFormattedPrimaryKeyValue);
262 MusicUtil.writeBackToQuorum(selectQuery, primaryKeyName, updateQuery, keyspace, table,
263 cqlFormattedPrimaryKeyValue);
272 public ResultSet quorumGet(PreparedQueryObject query) {
273 ResultSet results = null;
275 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
276 } catch (MusicServiceException | MusicQueryException e) {
277 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR);
286 * @param fullyQualifiedKey lockName
289 public String whoseTurnIsIt(String fullyQualifiedKey) {
290 String[] splitString = fullyQualifiedKey.split("\\.");
291 String keyspace = splitString[0];
292 String table = splitString[1];
293 String primaryKeyValue = splitString[2];
295 return "$" + fullyQualifiedKey + "$"
296 + getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
297 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
298 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
305 * @param lockReference
308 public static String getLockNameFromId(String lockReference) {
309 StringTokenizer st = new StringTokenizer(lockReference);
310 return st.nextToken("$");
314 public void destroyLockRef(String lockId) {
315 long start = System.currentTimeMillis();
316 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
317 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
318 String[] splitString = fullyQualifiedKey.split("\\.");
319 String keyspace = splitString[0];
320 String table = splitString[1];
321 String primaryKeyValue = splitString[2];
323 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef);
324 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
325 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
327 long end = System.currentTimeMillis();
328 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
331 public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) {
332 long start = System.currentTimeMillis();
333 String[] splitString = fullyQualifiedKey.split("\\.");
334 String keyspace = splitString[0];
335 String table = splitString[1];
336 String primaryKeyValue = splitString[2];
338 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference);
339 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
340 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockReference ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
342 long end = System.currentTimeMillis();
343 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
344 return new MusicLockState(LockStatus.UNLOCKED, "");
348 public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) {
349 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
350 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
351 if (voluntaryRelease) {
352 return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
354 return forciblyReleaseLock(fullyQualifiedKey, lockRef);
358 public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference) {
359 return destroyLockRef(fullyQualifiedKey, lockReference);
362 public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) {
363 String[] splitString = fullyQualifiedKey.split("\\.");
364 String keyspace = splitString[0];
365 String table = splitString[1];
367 //leave a signal that this key could potentially be unsynchronized
368 String syncTable = keyspace+".unsyncedKeys_"+table;
369 PreparedQueryObject queryObject = new PreparedQueryObject();
370 String values = "(?)";
371 queryObject.addValue(fullyQualifiedKey);
372 String insQuery = "insert into "+syncTable+" (key) values "+values+";";
373 queryObject.appendQueryString(insQuery);
375 MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
376 } catch (Exception e) {
377 logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
381 //now release the lock
382 return destroyLockRef(fullyQualifiedKey, lockReference);
388 * @throws MusicLockingException
390 public void deleteLock(String lockName) throws MusicLockingException {
394 // Prepared Query Additions.
400 * @throws MusicServiceException
402 public ReturnType eventualPut(PreparedQueryObject queryObject) {
403 boolean result = false;
405 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
406 } catch (MusicServiceException | MusicQueryException ex) {
407 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
408 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
409 return new ReturnType(ResultType.FAILURE, ex.getMessage());
412 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
414 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
422 * @throws MusicServiceException
424 public ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
425 boolean result = false;
427 PreparedQueryObject getGaurd = new PreparedQueryObject();
428 getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
429 getGaurd.addValue(primaryKey);
431 ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
432 Row row = getGaurdResult.one();
434 guard = row.getLong("guard");
435 long timeOfWrite = System.currentTimeMillis();
436 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
437 String query = queryObject.getQuery();
438 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
439 if (queryObject.getOperation().equalsIgnoreCase("delete"))
440 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
442 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
444 queryObject.replaceQueryString(query);
447 } catch (MusicServiceException | MusicQueryException e) {
448 logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage());
451 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
452 } catch (MusicServiceException | MusicQueryException ex) {
453 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
454 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
455 return new ReturnType(ResultType.FAILURE, ex.getMessage());
458 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
460 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
468 * @param primaryKeyValue
473 public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
474 PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
475 long start = System.currentTimeMillis();
477 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
478 lockId.substring(lockId.lastIndexOf("$")+1));
479 if(result.getResult().equals(ResultType.FAILURE))
480 return result;//not top of the lock store q
482 if (conditionInfo != null)
484 if (conditionInfo.testCondition() == false)
485 return new ReturnType(ResultType.FAILURE,
486 "Lock acquired but the condition is not true");
487 } catch (Exception e) {
488 return new ReturnType(ResultType.FAILURE,
489 "Exception thrown while checking the condition, check its sanctity:\n"
493 String query = queryObject.getQuery();
494 long timeOfWrite = System.currentTimeMillis();
495 long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$")+1));
496 long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
497 // TODO: use Statement instead of modifying query
498 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
499 if (queryObject.getOperation().equalsIgnoreCase("delete"))
500 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
501 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
502 query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
504 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
506 queryObject.replaceQueryString(query);
507 MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
508 dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
509 long end = System.currentTimeMillis();
510 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
511 }catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
512 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
513 return new ReturnType(ResultType.FAILURE,
514 "Exception thrown while doing the critical put\n"
517 return new ReturnType(ResultType.SUCCESS, "Update performed");
525 * @return Boolean Indicates success or failure
526 * @throws MusicServiceException
530 public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException {
531 // this is mainly for some functions like keyspace creation etc which does not
532 // really need the bells and whistles of Music locking.
533 boolean result = false;
535 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
536 } catch (MusicQueryException | MusicServiceException ex) {
537 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
538 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
539 throw new MusicServiceException(ex.getMessage());
541 return result ? ResultType.SUCCESS : ResultType.FAILURE;
545 * This method performs DDL operation on cassandra.
547 * @param queryObject query object containing prepared query and values
549 * @throws MusicServiceException
551 public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
552 ResultSet results = null;
554 results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
555 } catch (MusicQueryException | MusicServiceException e) {
556 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
557 throw new MusicServiceException(e.getMessage());
563 * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
564 * is used to check if the resource is free.
566 * @param keyspace name of the keyspace
567 * @param table name of the table
568 * @param primaryKeyValue primary key value
569 * @param queryObject query object containing prepared query and values
570 * @param lockId lock ID to check if the resource is free to perform the operation.
573 public ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
574 PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
575 ResultSet results = null;
578 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
579 lockId.substring(lockId.lastIndexOf("$")+1));
580 if(result.getResult().equals(ResultType.FAILURE))
581 return null;//not top of the lock store q
582 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
583 } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
584 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
590 * This method performs DML operation on cassandra, when the lock of the dd is acquired.
592 * @param keyspaceName name of the keyspace
593 * @param tableName name of the table
594 * @param primaryKey primary key value
595 * @param queryObject query object containing prepared query and values
597 * @throws MusicLockingException
598 * @throws MusicServiceException
599 * @throws MusicQueryException
601 public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
602 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException, MusicQueryException, MusicServiceException {
603 long start = System.currentTimeMillis();
604 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
605 String lockId = createLockReference(fullyQualifiedKey);
606 long lockCreationTime = System.currentTimeMillis();
607 ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockId);
608 long lockAcqTime = System.currentTimeMillis();
610 if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
611 logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
612 voluntaryReleaseLock(fullyQualifiedKey,lockId);
613 return lockAcqResult;
616 logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
617 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
618 ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
619 queryObject, lockRef, conditionInfo);
620 long criticalPutTime = System.currentTimeMillis();
621 voluntaryReleaseLock(fullyQualifiedKey,lockId);
622 long lockDeleteTime = System.currentTimeMillis();
623 String timingInfo = "|lock creation time:" + (lockCreationTime - start)
624 + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
625 + "|critical put time:" + (criticalPutTime - lockAcqTime)
626 + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
627 criticalPutResult.setTimingInfo(timingInfo);
628 return criticalPutResult;
635 * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
637 * @param keyspaceName name of the keyspace
638 * @param tableName name of the table
639 * @param primaryKey primary key value
640 * @param queryObject query object containing prepared query and values
642 * @throws MusicServiceException
643 * @throws MusicLockingException
644 * @throws MusicQueryException
646 public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
647 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
648 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
649 String lockId = createLockReference(fullyQualifiedKey);
650 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
651 ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockId);
652 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
653 logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
654 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
656 criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
657 voluntaryReleaseLock(fullyQualifiedKey,lockId);
660 voluntaryReleaseLock(fullyQualifiedKey,lockId);
661 logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
672 public Map<String, Object> validateLock(String lockName) {
673 return MusicUtil.validateLock(lockName);
677 public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
678 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
683 public List<String> getLockQueue(String fullyQualifiedKey)
684 throws MusicServiceException, MusicQueryException, MusicLockingException {
685 String[] splitString = fullyQualifiedKey.split("\\.");
686 String keyspace = splitString[0];
687 String table = splitString[1];
688 String primaryKeyValue = splitString[2];
690 return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
693 public long getLockQueueSize(String fullyQualifiedKey)
694 throws MusicServiceException, MusicQueryException, MusicLockingException {
695 String[] splitString = fullyQualifiedKey.split("\\.");
696 String keyspace = splitString[0];
697 String table = splitString[1];
698 String primaryKeyValue = splitString[2];
700 return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
703 public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
704 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {