2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Modifications Copyright (c) 2019 Samsung
8 * ===================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * ============LICENSE_END=============================================
22 * ====================================================================
25 package org.onap.music.service.impl;
27 import java.io.StringWriter;
28 import java.util.List;
30 import java.util.StringTokenizer;
32 import org.onap.music.datastore.MusicDataStore;
33 import org.onap.music.datastore.MusicDataStoreHandle;
34 import org.onap.music.datastore.PreparedQueryObject;
35 import org.onap.music.eelf.logging.EELFLoggerDelegate;
36 import org.onap.music.eelf.logging.format.AppMessages;
37 import org.onap.music.eelf.logging.format.ErrorSeverity;
38 import org.onap.music.eelf.logging.format.ErrorTypes;
39 import org.onap.music.exceptions.MusicLockingException;
40 import org.onap.music.exceptions.MusicQueryException;
41 import org.onap.music.exceptions.MusicServiceException;
42 import org.onap.music.lockingservice.cassandra.CassaLockStore;
43 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
44 import org.onap.music.lockingservice.cassandra.MusicLockState;
45 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
46 import org.onap.music.main.MusicUtil;
47 import org.onap.music.main.ResultType;
48 import org.onap.music.main.ReturnType;
49 import org.onap.music.service.MusicCoreService;
51 import com.datastax.driver.core.DataType;
52 import com.datastax.driver.core.ResultSet;
53 import com.datastax.driver.core.Row;
54 import com.datastax.driver.core.TableMetadata;
56 import org.onap.music.datastore.*;
58 public class MusicCassaCore implements MusicCoreService {
60 public static CassaLockStore mLockHandle = null;;
61 private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
62 private static boolean unitTestRun=true;
63 private static MusicCassaCore musicCassaCoreInstance = null;
65 private MusicCassaCore() {
68 public static MusicCassaCore getInstance() {
70 if(musicCassaCoreInstance == null) {
71 musicCassaCoreInstance = new MusicCassaCore();
73 return musicCassaCoreInstance;
76 public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
77 logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
78 long start = System.currentTimeMillis();
80 if (mLockHandle == null) {
82 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
83 } catch (Exception e) {
84 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
85 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
88 long end = System.currentTimeMillis();
89 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
95 public String createLockReference(String fullyQualifiedKey) {
96 String[] splitString = fullyQualifiedKey.split("\\.");
97 String keyspace = splitString[0];
98 String table = splitString[1];
99 String lockName = splitString[2];
101 logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
102 long start = System.currentTimeMillis();
103 String lockReference = null;
105 lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName);
106 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
109 long end = System.currentTimeMillis();
110 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
111 return lockReference;
115 public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
116 evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
117 return acquireLock(fullyQualifiedKey, lockReference);
120 private void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
122 String[] splitString = fullyQualifiedKey.split("\\.");
123 String keyspace = splitString[0];
124 String table = splitString[1];
125 String primaryKeyValue = splitString[2];
127 LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
129 /* Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the
132 long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.acquireTime), Long.parseLong(currentLockHolderObject.createTime));
133 if((System.currentTimeMillis() - referenceTime) > leasePeriod) {
134 forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.lockRef+"");
135 logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.lockRef+" forcibly released");
139 private static ReturnType isTopOfLockStore(String keyspace, String table, String primaryKeyValue, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException {
141 //return failure to lock holders too early or already evicted from the lock store
142 String topOfLockStoreS = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
143 long topOfLockStoreL = Long.parseLong(topOfLockStoreS);
144 long lockReferenceL = Long.parseLong(lockReference);
146 if(lockReferenceL > topOfLockStoreL) {
147 logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not the lock holder yet");
148 return new ReturnType(ResultType.FAILURE, lockReference+" is not the lock holder yet");
152 if(lockReferenceL < topOfLockStoreL) {
153 logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is no longer/or was never in the lock store queue");
154 return new ReturnType(ResultType.FAILURE, lockReference+" is no longer/or was never in the lock store queue");
157 return new ReturnType(ResultType.SUCCESS, lockReference+" is top of lock store");
160 public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
161 throws MusicLockingException, MusicQueryException, MusicServiceException {
162 String[] splitString = lockId.split("\\.");
163 String keyspace = splitString[0].substring(1);//remove '$'
164 String table = splitString[1];
165 String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
166 fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
167 String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
169 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockRef);
171 if(result.getResult().equals(ResultType.FAILURE))
172 return result;//not top of the lock store q
174 //check to see if the value of the key has to be synced in case there was a forceful release
175 String syncTable = keyspace+".unsyncedKeys_"+table;
176 String query = "select * from "+syncTable+" where key='"+fullyQualifiedKey+"';";
177 PreparedQueryObject readQueryObject = new PreparedQueryObject();
178 readQueryObject.appendQueryString(query);
179 ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
180 if (results.all().size() != 0) {
181 logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
183 syncQuorum(keyspace, table, primaryKeyValue);
184 } catch (Exception e) {
185 StringWriter sw = new StringWriter();
186 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
187 String exceptionAsString = sw.toString();
188 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
190 String cleanQuery = "delete from music_internal.unsynced_keys where key='"+fullyQualifiedKey+"';";
191 PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
192 deleteQueryObject.appendQueryString(cleanQuery);
193 MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
196 getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
198 return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
205 * @param tableQueryObject
207 * @return Boolean Indicates success or failure
208 * @throws MusicServiceException
212 public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject, String consistency) throws MusicServiceException {
213 boolean result = false;
216 //create shadow locking table
217 result = getLockingServiceHandle().createLockQueue(keyspace, table);
219 return ResultType.FAILURE;
223 //create table to track unsynced_keys
224 table = "unsyncedKeys_"+table;
226 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
227 + " ( key text,PRIMARY KEY (key) );";
228 System.out.println(tabQuery);
229 PreparedQueryObject queryObject = new PreparedQueryObject();
231 queryObject.appendQueryString(tabQuery);
233 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
236 //create actual table
237 result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
238 } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
239 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
240 throw new MusicServiceException(ex.getMessage());
242 return result?ResultType.SUCCESS:ResultType.FAILURE;
245 private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
246 logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
247 PreparedQueryObject selectQuery = new PreparedQueryObject();
248 PreparedQueryObject updateQuery = new PreparedQueryObject();
250 // get the primary key d
251 TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
252 String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName();// we only support single
254 DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
255 Object cqlFormattedPrimaryKeyValue =
256 MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
258 // get the row of data from a quorum
259 selectQuery.appendQueryString("SELECT * FROM " + keyspace + "." + table + " WHERE "
260 + primaryKeyName + "= ?" + ";");
261 selectQuery.addValue(cqlFormattedPrimaryKeyValue);
262 MusicUtil.writeBackToQuorum(selectQuery, primaryKeyName, updateQuery, keyspace, table,
263 cqlFormattedPrimaryKeyValue);
272 public ResultSet quorumGet(PreparedQueryObject query) {
273 ResultSet results = null;
275 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
276 } catch (MusicServiceException | MusicQueryException e) {
277 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR);
286 * @param fullyQualifiedKey lockName
289 public String whoseTurnIsIt(String fullyQualifiedKey) {
290 String[] splitString = fullyQualifiedKey.split("\\.");
291 String keyspace = splitString[0];
292 String table = splitString[1];
293 String primaryKeyValue = splitString[2];
295 return "$" + fullyQualifiedKey + "$"
296 + getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
297 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
298 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
305 * @param lockReference
308 public static String getLockNameFromId(String lockReference) {
309 StringTokenizer st = new StringTokenizer(lockReference);
310 return st.nextToken("$");
314 public void destroyLockRef(String lockId) {
315 long start = System.currentTimeMillis();
316 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
317 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
318 String[] splitString = fullyQualifiedKey.split("\\.");
319 String keyspace = splitString[0];
320 String table = splitString[1];
321 String primaryKeyValue = splitString[2];
323 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef);
324 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
325 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
327 long end = System.currentTimeMillis();
328 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
331 public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) {
332 long start = System.currentTimeMillis();
333 String[] splitString = fullyQualifiedKey.split("\\.");
334 String keyspace = splitString[0];
335 String table = splitString[1];
336 String primaryKeyValue = splitString[2];
338 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference);
339 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
340 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockReference ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
342 long end = System.currentTimeMillis();
343 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
344 return new MusicLockState(LockStatus.UNLOCKED, "");
348 public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) {
349 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
350 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
351 if (voluntaryRelease) {
352 return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
354 return forciblyReleaseLock(fullyQualifiedKey, lockRef);
358 public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference) {
359 return destroyLockRef(fullyQualifiedKey, lockReference);
362 public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) {
363 String[] splitString = fullyQualifiedKey.split("\\.");
364 String keyspace = splitString[0];
365 String table = splitString[1];
367 //leave a signal that this key could potentially be unsynchronized
368 String syncTable = keyspace+".unsyncedKeys_"+table;
369 PreparedQueryObject queryObject = new PreparedQueryObject();
370 String values = "(?)";
371 queryObject.addValue(fullyQualifiedKey);
372 String insQuery = "insert into "+syncTable+" (key) values "+values+";";
373 queryObject.appendQueryString(insQuery);
375 MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
376 } catch (Exception e) {
377 logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
381 //now release the lock
382 return destroyLockRef(fullyQualifiedKey, lockReference);
388 * @throws MusicLockingException
390 public void deleteLock(String lockName) throws MusicLockingException {
394 // Prepared Query Additions.
400 * @throws MusicServiceException
402 public ReturnType eventualPut(PreparedQueryObject queryObject) {
403 boolean result = false;
405 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
406 } catch (MusicServiceException | MusicQueryException ex) {
407 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
408 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
409 return new ReturnType(ResultType.FAILURE, ex.getMessage());
412 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
414 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
422 * @throws MusicServiceException
424 public ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
425 boolean result = false;
427 PreparedQueryObject getGaurd = new PreparedQueryObject();
428 getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
429 getGaurd.addValue(primaryKey);
431 ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
432 Row row = getGaurdResult.one();
434 guard = row.getLong("guard");
435 long timeOfWrite = System.currentTimeMillis();
436 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
437 String query = queryObject.getQuery();
438 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
439 if (queryObject.getOperation().equalsIgnoreCase("delete"))
440 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
442 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
444 queryObject.replaceQueryString(query);
447 } catch (MusicServiceException | MusicQueryException e) {
448 logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage());
451 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
452 } catch (MusicServiceException | MusicQueryException ex) {
453 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
454 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
455 return new ReturnType(ResultType.FAILURE, ex.getMessage());
458 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
460 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
468 * @param primaryKeyValue
473 public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
474 PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
475 long start = System.currentTimeMillis();
477 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
478 lockId.substring(lockId.lastIndexOf("$")+1));
479 if(result.getResult().equals(ResultType.FAILURE))
480 return result;//not top of the lock store q
482 if (conditionInfo != null)
484 if (conditionInfo.testCondition() == false)
485 return new ReturnType(ResultType.FAILURE,
486 "Lock acquired but the condition is not true");
487 } catch (Exception e) {
488 return new ReturnType(ResultType.FAILURE,
489 "Exception thrown while checking the condition, check its sanctity:\n"
493 String query = queryObject.getQuery();
494 long timeOfWrite = System.currentTimeMillis();
495 long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$")+1));
496 long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
497 // TODO: use Statement instead of modifying query
498 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
499 if (queryObject.getOperation().equalsIgnoreCase("delete"))
500 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
501 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
502 query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
504 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
506 queryObject.replaceQueryString(query);
507 MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
508 dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
509 long end = System.currentTimeMillis();
510 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
511 }catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
512 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
513 return new ReturnType(ResultType.FAILURE,
514 "Exception thrown while doing the critical put\n"
517 return new ReturnType(ResultType.SUCCESS, "Update performed");
525 * @return Boolean Indicates success or failure
526 * @throws MusicServiceException
530 public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException {
531 // this is mainly for some functions like keyspace creation etc which does not
532 // really need the bells and whistles of Music locking.
533 boolean result = false;
535 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
536 } catch (MusicQueryException | MusicServiceException ex) {
537 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
538 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
539 throw new MusicServiceException(ex.getMessage());
541 return result ? ResultType.SUCCESS : ResultType.FAILURE;
545 * This method performs DDL operation on cassandra.
547 * @param queryObject query object containing prepared query and values
549 * @throws MusicServiceException
551 public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
552 ResultSet results = null;
554 results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
555 } catch (MusicQueryException | MusicServiceException e) {
556 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
557 throw new MusicServiceException(e.getMessage());
563 * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
564 * is used to check if the resource is free.
566 * @param keyspace name of the keyspace
567 * @param table name of the table
568 * @param primaryKeyValue primary key value
569 * @param queryObject query object containing prepared query and values
570 * @param lockId lock ID to check if the resource is free to perform the operation.
573 public ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
574 PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
575 ResultSet results = null;
578 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
579 lockId.substring(lockId.lastIndexOf("$")+1));
580 if(result.getResult().equals(ResultType.FAILURE))
581 return null;//not top of the lock store q
582 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
583 } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
584 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
590 * This method performs DML operation on cassandra, when the lock of the dd is acquired.
592 * @param keyspaceName name of the keyspace
593 * @param tableName name of the table
594 * @param primaryKey primary key value
595 * @param queryObject query object containing prepared query and values
597 * @throws MusicLockingException
598 * @throws MusicServiceException
599 * @throws MusicQueryException
601 public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
602 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException, MusicQueryException, MusicServiceException {
603 long start = System.currentTimeMillis();
604 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
605 String lockId = createLockReference(fullyQualifiedKey);
606 long lockCreationTime = System.currentTimeMillis();
607 ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockId);
608 long lockAcqTime = System.currentTimeMillis();
610 if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
611 logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
612 voluntaryReleaseLock(fullyQualifiedKey,lockId);
613 return lockAcqResult;
616 logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
617 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
618 ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
619 queryObject, lockRef, conditionInfo);
620 long criticalPutTime = System.currentTimeMillis();
621 voluntaryReleaseLock(fullyQualifiedKey,lockId);
622 long lockDeleteTime = System.currentTimeMillis();
623 String timingInfo = "|lock creation time:" + (lockCreationTime - start)
624 + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
625 + "|critical put time:" + (criticalPutTime - lockAcqTime)
626 + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
627 criticalPutResult.setTimingInfo(timingInfo);
628 return criticalPutResult;
635 * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
637 * @param keyspaceName name of the keyspace
638 * @param tableName name of the table
639 * @param primaryKey primary key value
640 * @param queryObject query object containing prepared query and values
642 * @throws MusicServiceException
643 * @throws MusicLockingException
644 * @throws MusicQueryException
646 public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
647 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
648 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
649 String lockId = createLockReference(fullyQualifiedKey);
650 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
651 ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockId);
652 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
653 logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
654 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
656 criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
657 voluntaryReleaseLock(fullyQualifiedKey,lockId);
660 voluntaryReleaseLock(fullyQualifiedKey,lockId);
661 logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
672 public Map<String, Object> validateLock(String lockName) {
673 return MusicUtil.validateLock(lockName);
677 public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
678 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
683 public List<String> getLockQueue(String fullyQualifiedKey)
684 throws MusicServiceException, MusicQueryException, MusicLockingException {
685 String[] splitString = fullyQualifiedKey.split("\\.");
686 String keyspace = splitString[0];
687 String table = splitString[1];
688 String primaryKeyValue = splitString[2];
690 return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
693 public long getLockQueueSize(String fullyQualifiedKey)
694 throws MusicServiceException, MusicQueryException, MusicLockingException {
695 String[] splitString = fullyQualifiedKey.split("\\.");
696 String keyspace = splitString[0];
697 String table = splitString[1];
698 String primaryKeyValue = splitString[2];
700 return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
703 public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
704 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {