2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * Modifications Copyright (c) 2018 IBM.
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
20 * ============LICENSE_END=============================================
21 * ====================================================================
24 package org.onap.music.service.impl;
26 import java.io.StringWriter;
27 import java.util.HashMap;
28 import java.util.List;
30 import java.util.StringTokenizer;
32 import org.onap.music.datastore.MusicDataStore;
33 import org.onap.music.datastore.MusicDataStoreHandle;
34 import org.onap.music.datastore.PreparedQueryObject;
35 import org.onap.music.eelf.logging.EELFLoggerDelegate;
36 import org.onap.music.eelf.logging.format.AppMessages;
37 import org.onap.music.eelf.logging.format.ErrorSeverity;
38 import org.onap.music.eelf.logging.format.ErrorTypes;
39 import org.onap.music.exceptions.MusicLockingException;
40 import org.onap.music.exceptions.MusicQueryException;
41 import org.onap.music.exceptions.MusicServiceException;
42 import org.onap.music.lockingservice.cassandra.CassaLockStore;
43 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
44 import org.onap.music.lockingservice.cassandra.MusicLockState;
45 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
46 import org.onap.music.main.MusicUtil;
47 import org.onap.music.main.ResultType;
48 import org.onap.music.main.ReturnType;
49 import org.onap.music.service.MusicCoreService;
51 import com.datastax.driver.core.ColumnDefinitions;
52 import com.datastax.driver.core.ColumnDefinitions.Definition;
53 import com.datastax.driver.core.DataType;
54 import com.datastax.driver.core.ResultSet;
55 import com.datastax.driver.core.Row;
56 import com.datastax.driver.core.TableMetadata;
57 import com.google.common.util.concurrent.Monitor.Guard;
59 import org.onap.music.datastore.*;
61 public class MusicCassaCore implements MusicCoreService {
63 public static CassaLockStore mLockHandle = null;;
64 private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
65 private static boolean unitTestRun=true;
66 private static MusicCassaCore musicCassaCoreInstance = null;
68 private MusicCassaCore() {
71 public static MusicCassaCore getInstance() {
73 if(musicCassaCoreInstance == null) {
74 musicCassaCoreInstance = new MusicCassaCore();
76 return musicCassaCoreInstance;
79 public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
80 logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
81 long start = System.currentTimeMillis();
83 if (mLockHandle == null) {
85 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
86 } catch (Exception e) {
87 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
88 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
91 long end = System.currentTimeMillis();
92 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
98 public String createLockReference(String fullyQualifiedKey) {
99 String[] splitString = fullyQualifiedKey.split("\\.");
100 String keyspace = splitString[0];
101 String table = splitString[1];
102 String lockName = splitString[2];
104 logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
105 long start = System.currentTimeMillis();
106 String lockReference = null;
108 lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName);
109 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
112 long end = System.currentTimeMillis();
113 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
114 return lockReference;
118 public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
119 evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
120 return acquireLock(fullyQualifiedKey, lockReference);
123 private void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
125 String[] splitString = fullyQualifiedKey.split("\\.");
126 String keyspace = splitString[0];
127 String table = splitString[1];
128 String primaryKeyValue = splitString[2];
130 LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
132 /* Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the
135 long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.acquireTime), Long.parseLong(currentLockHolderObject.createTime));
136 if((System.currentTimeMillis() - referenceTime) > leasePeriod) {
137 forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.lockRef+"");
138 logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.lockRef+" forcibly released");
142 private static ReturnType isTopOfLockStore(String keyspace, String table, String primaryKeyValue, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException {
144 //return failure to lock holders too early or already evicted from the lock store
145 String topOfLockStoreS = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
146 long topOfLockStoreL = Long.parseLong(topOfLockStoreS);
147 long lockReferenceL = Long.parseLong(lockReference);
149 if(lockReferenceL > topOfLockStoreL) {
150 logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not the lock holder yet");
151 return new ReturnType(ResultType.FAILURE, lockReference+" is not the lock holder yet");
155 if(lockReferenceL < topOfLockStoreL) {
156 logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is no longer/or was never in the lock store queue");
157 return new ReturnType(ResultType.FAILURE, lockReference+" is no longer/or was never in the lock store queue");
160 return new ReturnType(ResultType.SUCCESS, lockReference+" is top of lock store");
163 public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
164 throws MusicLockingException, MusicQueryException, MusicServiceException {
165 String[] splitString = lockId.split("\\.");
166 String keyspace = splitString[0].substring(1);//remove '$'
167 String table = splitString[1];
168 String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
169 String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
170 String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
172 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockRef);
174 if(result.getResult().equals(ResultType.FAILURE))
175 return result;//not top of the lock store q
177 //check to see if the value of the key has to be synced in case there was a forceful release
178 String syncTable = keyspace+".unsyncedKeys_"+table;
179 String query = "select * from "+syncTable+" where key='"+localFullyQualifiedKey+"';";
180 PreparedQueryObject readQueryObject = new PreparedQueryObject();
181 readQueryObject.appendQueryString(query);
182 ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
183 if (results.all().size() != 0) {
184 logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
186 syncQuorum(keyspace, table, primaryKeyValue);
187 } catch (Exception e) {
188 StringWriter sw = new StringWriter();
189 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
190 String exceptionAsString = sw.toString();
191 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
193 String cleanQuery = "delete from music_internal.unsynced_keys where key='"+localFullyQualifiedKey+"';";
194 PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
195 deleteQueryObject.appendQueryString(cleanQuery);
196 MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
199 getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
201 return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
208 * @param tableQueryObject
210 * @return Boolean Indicates success or failure
211 * @throws MusicServiceException
215 public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject, String consistency) throws MusicServiceException {
216 boolean result = false;
219 //create shadow locking table
220 result = getLockingServiceHandle().createLockQueue(keyspace, table);
222 return ResultType.FAILURE;
226 //create table to track unsynced_keys
227 table = "unsyncedKeys_"+table;
229 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
230 + " ( key text,PRIMARY KEY (key) );";
231 System.out.println(tabQuery);
232 PreparedQueryObject queryObject = new PreparedQueryObject();
234 queryObject.appendQueryString(tabQuery);
236 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
239 //create actual table
240 result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
241 } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
242 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
243 throw new MusicServiceException(ex.getMessage());
245 return result?ResultType.SUCCESS:ResultType.FAILURE;
248 private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
249 logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
250 PreparedQueryObject selectQuery = new PreparedQueryObject();
251 PreparedQueryObject updateQuery = new PreparedQueryObject();
253 // get the primary key d
254 TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
255 String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName();// we only support single
257 DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
258 Object cqlFormattedPrimaryKeyValue =
259 MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
261 // get the row of data from a quorum
262 selectQuery.appendQueryString("SELECT * FROM " + keyspace + "." + table + " WHERE "
263 + primaryKeyName + "= ?" + ";");
264 selectQuery.addValue(cqlFormattedPrimaryKeyValue);
265 ResultSet results = null;
267 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(selectQuery);
268 // write it back to a quorum
269 Row row = results.one();
270 ColumnDefinitions colInfo = row.getColumnDefinitions();
271 int totalColumns = colInfo.size();
273 StringBuilder fieldValueString = new StringBuilder("");
274 for (Definition definition : colInfo) {
275 String colName = definition.getName();
276 if (colName.equals(primaryKeyName))
278 DataType colType = definition.getType();
279 Object valueObj = MusicDataStoreHandle.getDSHandle().getColValue(row, colName, colType);
280 Object valueString = MusicUtil.convertToActualDataType(colType, valueObj);
281 fieldValueString.append(colName + " = ?");
282 updateQuery.addValue(valueString);
283 if (counter != (totalColumns - 1))
284 fieldValueString.append(",");
285 counter = counter + 1;
287 updateQuery.appendQueryString("UPDATE " + keyspace + "." + table + " SET "
288 + fieldValueString + " WHERE " + primaryKeyName + "= ? " + ";");
289 updateQuery.addValue(cqlFormattedPrimaryKeyValue);
291 MusicDataStoreHandle.getDSHandle().executePut(updateQuery, "critical");
292 } catch (MusicServiceException | MusicQueryException e) {
293 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.QUERYERROR +""+updateQuery ,ErrorSeverity.MAJOR, ErrorTypes.QUERYERROR);
305 public ResultSet quorumGet(PreparedQueryObject query) {
306 ResultSet results = null;
308 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
309 } catch (MusicServiceException | MusicQueryException e) {
310 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR);
321 * @param fullyQualifiedKey lockName
324 public String whoseTurnIsIt(String fullyQualifiedKey) {
325 String[] splitString = fullyQualifiedKey.split("\\.");
326 String keyspace = splitString[0];
327 String table = splitString[1];
328 String primaryKeyValue = splitString[2];
330 return "$" + fullyQualifiedKey + "$"
331 + getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
332 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
333 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
340 * @param lockReference
343 public static String getLockNameFromId(String lockReference) {
344 StringTokenizer st = new StringTokenizer(lockReference);
345 return st.nextToken("$");
349 public void destroyLockRef(String lockId) {
350 long start = System.currentTimeMillis();
351 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
352 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
353 String[] splitString = fullyQualifiedKey.split("\\.");
354 String keyspace = splitString[0];
355 String table = splitString[1];
356 String primaryKeyValue = splitString[2];
358 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef);
359 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
360 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
362 long end = System.currentTimeMillis();
363 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
366 public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) {
367 long start = System.currentTimeMillis();
368 String[] splitString = fullyQualifiedKey.split("\\.");
369 String keyspace = splitString[0];
370 String table = splitString[1];
371 String primaryKeyValue = splitString[2];
373 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference);
374 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
375 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockReference ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
377 long end = System.currentTimeMillis();
378 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
379 return new MusicLockState(LockStatus.UNLOCKED, "");
383 public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) {
384 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
385 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
386 if (voluntaryRelease) {
387 return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
389 return forciblyReleaseLock(fullyQualifiedKey, lockRef);
393 public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference) {
394 return destroyLockRef(fullyQualifiedKey, lockReference);
397 public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) {
398 String[] splitString = fullyQualifiedKey.split("\\.");
399 String keyspace = splitString[0];
400 String table = splitString[1];
402 //leave a signal that this key could potentially be unsynchronized
403 String syncTable = keyspace+".unsyncedKeys_"+table;
404 PreparedQueryObject queryObject = new PreparedQueryObject();
405 String values = "(?)";
406 queryObject.addValue(fullyQualifiedKey);
407 String insQuery = "insert into "+syncTable+" (key) values "+values+";";
408 queryObject.appendQueryString(insQuery);
410 MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
411 } catch (Exception e) {
412 logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
416 //now release the lock
417 return destroyLockRef(fullyQualifiedKey, lockReference);
423 * @throws MusicLockingException
425 public void deleteLock(String lockName) throws MusicLockingException {
429 // Prepared Query Additions.
435 * @throws MusicServiceException
437 public ReturnType eventualPut(PreparedQueryObject queryObject) {
438 boolean result = false;
440 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
441 } catch (MusicServiceException | MusicQueryException ex) {
442 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
443 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
444 return new ReturnType(ResultType.FAILURE, ex.getMessage());
447 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
449 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
457 * @throws MusicServiceException
459 public ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
460 boolean result = false;
462 PreparedQueryObject getGaurd = new PreparedQueryObject();
463 getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
464 getGaurd.addValue(primaryKey);
466 ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
467 Row row = getGaurdResult.one();
469 guard = row.getLong("guard");
470 long timeOfWrite = System.currentTimeMillis();
471 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
472 String query = queryObject.getQuery();
473 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
474 if (queryObject.getOperation().equalsIgnoreCase("delete"))
475 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
477 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
479 queryObject.replaceQueryString(query);
482 } catch (MusicServiceException | MusicQueryException e) {
483 logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage());
486 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
487 } catch (MusicServiceException | MusicQueryException ex) {
488 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
489 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
490 return new ReturnType(ResultType.FAILURE, ex.getMessage());
493 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
495 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
503 * @param primaryKeyValue
508 public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
509 PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
510 long start = System.currentTimeMillis();
512 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
513 lockId.substring(lockId.lastIndexOf("$")+1));
514 if(result.getResult().equals(ResultType.FAILURE))
515 return result;//not top of the lock store q
517 if (conditionInfo != null)
519 if (conditionInfo.testCondition() == false)
520 return new ReturnType(ResultType.FAILURE,
521 "Lock acquired but the condition is not true");
522 } catch (Exception e) {
523 return new ReturnType(ResultType.FAILURE,
524 "Exception thrown while checking the condition, check its sanctity:\n"
528 String query = queryObject.getQuery();
529 long timeOfWrite = System.currentTimeMillis();
530 long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$")+1));
531 long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
532 // TODO: use Statement instead of modifying query
533 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
534 if (queryObject.getOperation().equalsIgnoreCase("delete"))
535 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
536 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
537 query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
539 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
541 queryObject.replaceQueryString(query);
542 MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
543 dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
544 long end = System.currentTimeMillis();
545 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
546 }catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
547 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
548 return new ReturnType(ResultType.FAILURE,
549 "Exception thrown while doing the critical put\n"
552 return new ReturnType(ResultType.SUCCESS, "Update performed");
560 * @return Boolean Indicates success or failure
561 * @throws MusicServiceException
565 public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException {
566 // this is mainly for some functions like keyspace creation etc which does not
567 // really need the bells and whistles of Music locking.
568 boolean result = false;
570 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
571 } catch (MusicQueryException | MusicServiceException ex) {
572 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
573 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
574 throw new MusicServiceException(ex.getMessage());
576 return result ? ResultType.SUCCESS : ResultType.FAILURE;
580 * This method performs DDL operation on cassandra.
582 * @param queryObject query object containing prepared query and values
584 * @throws MusicServiceException
586 public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
587 ResultSet results = null;
589 results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
590 } catch (MusicQueryException | MusicServiceException e) {
591 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
592 throw new MusicServiceException(e.getMessage());
598 * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
599 * is used to check if the resource is free.
601 * @param keyspace name of the keyspace
602 * @param table name of the table
603 * @param primaryKeyValue primary key value
604 * @param queryObject query object containing prepared query and values
605 * @param lockId lock ID to check if the resource is free to perform the operation.
608 public ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
609 PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
610 ResultSet results = null;
613 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
614 lockId.substring(lockId.lastIndexOf("$")+1));
615 if(result.getResult().equals(ResultType.FAILURE))
616 return null;//not top of the lock store q
617 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
618 } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
619 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
625 * This method performs DML operation on cassandra, when the lock of the dd is acquired.
627 * @param keyspaceName name of the keyspace
628 * @param tableName name of the table
629 * @param primaryKey primary key value
630 * @param queryObject query object containing prepared query and values
632 * @throws MusicLockingException
633 * @throws MusicServiceException
634 * @throws MusicQueryException
636 public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
637 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException, MusicQueryException, MusicServiceException {
638 long start = System.currentTimeMillis();
639 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
640 String lockId = createLockReference(fullyQualifiedKey);
641 long lockCreationTime = System.currentTimeMillis();
642 ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockId);
643 long lockAcqTime = System.currentTimeMillis();
645 if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
646 logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
647 voluntaryReleaseLock(fullyQualifiedKey,lockId);
648 return lockAcqResult;
651 logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
652 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
653 ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
654 queryObject, lockRef, conditionInfo);
655 long criticalPutTime = System.currentTimeMillis();
656 voluntaryReleaseLock(fullyQualifiedKey,lockId);
657 long lockDeleteTime = System.currentTimeMillis();
658 String timingInfo = "|lock creation time:" + (lockCreationTime - start)
659 + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
660 + "|critical put time:" + (criticalPutTime - lockAcqTime)
661 + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
662 criticalPutResult.setTimingInfo(timingInfo);
663 return criticalPutResult;
670 * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
672 * @param keyspaceName name of the keyspace
673 * @param tableName name of the table
674 * @param primaryKey primary key value
675 * @param queryObject query object containing prepared query and values
677 * @throws MusicServiceException
678 * @throws MusicLockingException
679 * @throws MusicQueryException
681 public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
682 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
683 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
684 String lockId = createLockReference(fullyQualifiedKey);
685 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
686 ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockId);
687 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
688 logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
689 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
691 criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
692 voluntaryReleaseLock(fullyQualifiedKey,lockId);
695 voluntaryReleaseLock(fullyQualifiedKey,lockId);
696 logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
707 public Map<String, Object> validateLock(String lockName) {
708 Map<String, Object> resultMap = new HashMap<>();
709 String[] locks = lockName.split("\\.");
710 if(locks.length < 3) {
711 resultMap.put("Error", "Invalid lock. Please make sure lock is of the type keyspaceName.tableName.primaryKey");
714 String keyspace= locks[0];
715 if(keyspace.startsWith("$"))
716 keyspace = keyspace.substring(1);
717 resultMap.put("keyspace",keyspace);
722 public static void main(String[] args) {
723 String x = "axe top";
724 x = x.replaceFirst("top", "sword");
725 System.out.print(x); //returns sword pickaxe
731 public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
732 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
737 public List<String> getLockQueue(String fullyQualifiedKey)
738 throws MusicServiceException, MusicQueryException, MusicLockingException {
739 String[] splitString = fullyQualifiedKey.split("\\.");
740 String keyspace = splitString[0];
741 String table = splitString[1];
742 String primaryKeyValue = splitString[2];
744 return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
747 public long getLockQueueSize(String fullyQualifiedKey)
748 throws MusicServiceException, MusicQueryException, MusicLockingException {
749 String[] splitString = fullyQualifiedKey.split("\\.");
750 String keyspace = splitString[0];
751 String table = splitString[1];
752 String primaryKeyValue = splitString[2];
754 return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
757 public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
758 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {