2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * Modifications Copyright (c) 2018 IBM.
7 * ===================================================================
8 * Modifications Copyright (c) 2019 Samsung
9 * ===================================================================
10 * Licensed under the Apache License, Version 2.0 (the "License");
11 * you may not use this file except in compliance with the License.
12 * You may obtain a copy of the License at
14 * http://www.apache.org/licenses/LICENSE-2.0
16 * Unless required by applicable law or agreed to in writing, software
17 * distributed under the License is distributed on an "AS IS" BASIS,
18 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19 * See the License for the specific language governing permissions and
20 * limitations under the License.
22 * ============LICENSE_END=============================================
23 * ====================================================================
26 package org.onap.music.service.impl;
28 import java.io.StringWriter;
29 import java.util.List;
31 import java.util.StringTokenizer;
33 import javax.ws.rs.core.MultivaluedMap;
35 import org.onap.music.datastore.Condition;
36 import org.onap.music.datastore.MusicDataStore;
37 import org.onap.music.datastore.MusicDataStoreHandle;
38 import org.onap.music.datastore.PreparedQueryObject;
39 import org.onap.music.datastore.jsonobjects.JsonDelete;
40 import org.onap.music.datastore.jsonobjects.JsonIndex;
41 import org.onap.music.datastore.jsonobjects.JsonInsert;
42 import org.onap.music.datastore.jsonobjects.JsonKeySpace;
43 import org.onap.music.datastore.jsonobjects.JsonSelect;
44 import org.onap.music.datastore.jsonobjects.JsonTable;
45 import org.onap.music.datastore.jsonobjects.JsonUpdate;
46 import org.onap.music.eelf.logging.EELFLoggerDelegate;
47 import org.onap.music.eelf.logging.format.AppMessages;
48 import org.onap.music.eelf.logging.format.ErrorSeverity;
49 import org.onap.music.eelf.logging.format.ErrorTypes;
50 import org.onap.music.exceptions.MusicLockingException;
51 import org.onap.music.exceptions.MusicQueryException;
52 import org.onap.music.exceptions.MusicServiceException;
53 import org.onap.music.lockingservice.cassandra.CassaLockStore;
54 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
55 import org.onap.music.lockingservice.cassandra.LockType;
56 import org.onap.music.lockingservice.cassandra.MusicLockState;
57 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
58 import org.onap.music.main.MusicUtil;
59 import org.onap.music.main.ResultType;
60 import org.onap.music.main.ReturnType;
61 import org.onap.music.service.MusicCoreService;
63 import com.datastax.driver.core.DataType;
64 import com.datastax.driver.core.ResultSet;
65 import com.datastax.driver.core.Row;
66 import com.datastax.driver.core.TableMetadata;
68 public class MusicCassaCore implements MusicCoreService {
70 private static CassaLockStore mLockHandle = null;
71 private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
72 private static MusicCassaCore musicCassaCoreInstance = null;
74 private MusicCassaCore() {
75 // not going to happen
78 public static CassaLockStore getmLockHandle() {
82 public static void setmLockHandle(CassaLockStore mLockHandle) {
83 MusicCassaCore.mLockHandle = mLockHandle;
86 public static MusicCassaCore getInstance() {
88 if(musicCassaCoreInstance == null) {
89 musicCassaCoreInstance = new MusicCassaCore();
91 return musicCassaCoreInstance;
97 public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
98 logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
99 long start = System.currentTimeMillis();
101 if (mLockHandle == null) {
103 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
104 } catch (Exception e) {
105 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
106 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
109 long end = System.currentTimeMillis();
110 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
114 public String createLockReference(String fullyQualifiedKey) throws MusicLockingException {
115 return createLockReference(fullyQualifiedKey, LockType.WRITE);
118 public String createLockReference(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
119 String[] splitString = fullyQualifiedKey.split("\\.");
120 String keyspace = splitString[0];
121 String table = splitString[1];
122 String lockName = splitString[2];
124 logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
125 long start = System.currentTimeMillis();
126 String lockReference = null;
129 lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype);
130 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
131 logger.error(EELFLoggerDelegate.applicationLogger, e);
132 throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
133 } catch (Exception e) {
134 logger.error(EELFLoggerDelegate.applicationLogger, e);
135 throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
137 long end = System.currentTimeMillis();
138 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
139 return lockReference;
143 public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod)
144 throws MusicLockingException, MusicQueryException, MusicServiceException {
145 evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
146 return acquireLock(fullyQualifiedKey, lockReference);
149 private void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod)
150 throws MusicLockingException, MusicQueryException, MusicServiceException {
151 String[] splitString = fullyQualifiedKey.split("\\.");
152 String keyspace = splitString[0];
153 String table = splitString[1];
154 String primaryKeyValue = splitString[2];
156 LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
158 if (!currentLockHolderObject.getIsLockOwner()) { // no lock holder
162 * Release the lock of the previous holder if it has expired. if the update to the acquire time has
163 * not reached due to network delays, simply use the create time as the reference
165 long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.getAcquireTime()),
166 Long.parseLong(currentLockHolderObject.getCreateTime()));
167 if ((System.currentTimeMillis() - referenceTime) > leasePeriod) {
168 forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.getLockRef() + "");
169 logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.getLockRef() + " forcibly released");
173 public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
174 throws MusicLockingException, MusicQueryException, MusicServiceException {
175 String[] splitString = lockId.split("\\.");
176 String keyspace = splitString[0].substring(1);//remove '$'
177 String table = splitString[1];
178 String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
179 String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
180 String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
182 LockObject lockInfo = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue, lockRef);
184 if (!lockInfo.getIsLockOwner()) {
185 return new ReturnType(ResultType.FAILURE, lockId + " is not a lock holder");//not top of the lock store q
188 //check to see if the value of the key has to be synced in case there was a forceful release
189 String syncTable = keyspace+".unsyncedKeys_"+table;
190 String query = "select * from "+syncTable+" where key='"+localFullyQualifiedKey+"';";
191 PreparedQueryObject readQueryObject = new PreparedQueryObject();
192 readQueryObject.appendQueryString(query);
193 ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
194 if (!results.all().isEmpty()) {
195 logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
197 syncQuorum(keyspace, table, primaryKeyValue);
198 } catch (Exception e) {
199 StringWriter sw = new StringWriter();
200 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",
201 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
202 String exceptionAsString = sw.toString();
203 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
205 String cleanQuery = "delete from " + syncTable + " where key='"+localFullyQualifiedKey+"';";
206 PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
207 deleteQueryObject.appendQueryString(cleanQuery);
208 MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
211 getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
213 return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
220 * @param tableQueryObject
222 * @return Boolean Indicates success or failure
223 * @throws MusicServiceException
227 public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject,
228 String consistency) throws MusicServiceException {
229 boolean result = false;
232 // create shadow locking table
233 result = getLockingServiceHandle().createLockQueue(keyspace, table);
235 return ResultType.FAILURE;
239 // create table to track unsynced_keys
240 table = "unsyncedKeys_" + table;
243 "CREATE TABLE IF NOT EXISTS " + keyspace + "." + table + " ( key text,PRIMARY KEY (key) );";
244 PreparedQueryObject queryObject = new PreparedQueryObject();
246 queryObject.appendQueryString(tabQuery);
248 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
250 // create actual table
251 result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
252 } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
253 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR, ErrorSeverity.WARN,
254 ErrorTypes.MUSICSERVICEERROR);
255 throw new MusicServiceException(ex.getMessage());
257 return result ? ResultType.SUCCESS : ResultType.FAILURE;
260 private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
261 logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
262 PreparedQueryObject selectQuery = new PreparedQueryObject();
263 PreparedQueryObject updateQuery = new PreparedQueryObject();
265 // get the primary key d
266 TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
267 String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName(); // we only support single
269 DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
270 Object cqlFormattedPrimaryKeyValue =
271 MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
273 // get the row of data from a quorum
274 selectQuery.appendQueryString("SELECT * FROM " + keyspace + "." + table + " WHERE "
275 + primaryKeyName + "= ?" + ";");
276 selectQuery.addValue(cqlFormattedPrimaryKeyValue);
277 MusicUtil.writeBackToQuorum(selectQuery, primaryKeyName, updateQuery, keyspace, table,
278 cqlFormattedPrimaryKeyValue);
286 public ResultSet quorumGet(PreparedQueryObject query) {
287 ResultSet results = null;
289 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
290 } catch (MusicServiceException | MusicQueryException e) {
291 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR,
292 ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR, e);
298 public String whoseTurnIsIt(String fullyQualifiedKey) {
299 String[] splitString = fullyQualifiedKey.split("\\.");
300 String keyspace = splitString[0];
301 String table = splitString[1];
302 String primaryKeyValue = splitString[2];
304 LockObject lockOwner = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
305 if (!lockOwner.getIsLockOwner()) {
308 return "$" + fullyQualifiedKey + "$" + lockOwner.getLockRef();
309 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
310 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.LOCKINGERROR + fullyQualifiedKey,
311 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
316 public List<String> getCurrentLockHolders(String fullyQualifiedKey) {
317 String[] splitString = fullyQualifiedKey.split("\\.");
318 String keyspace = splitString[0];
319 String table = splitString[1];
320 String primaryKeyValue = splitString[2];
322 return getLockingServiceHandle().getCurrentLockHolders(keyspace, table, primaryKeyValue);
323 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
324 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
331 * @param lockReference
334 public static String getLockNameFromId(String lockReference) {
335 StringTokenizer st = new StringTokenizer(lockReference);
336 return st.nextToken("$");
340 public void destroyLockRef(String lockId) throws MusicLockingException {
341 long start = System.currentTimeMillis();
342 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
343 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
344 String[] splitString = fullyQualifiedKey.split("\\.");
345 String keyspace = splitString[0];
346 String table = splitString[1];
347 String primaryKeyValue = splitString[2];
349 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef,MusicUtil.getRetryCount());
350 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
351 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef,
352 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
353 throw new MusicLockingException(e.getMessage());
355 long end = System.currentTimeMillis();
356 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
359 public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
360 long start = System.currentTimeMillis();
361 String[] splitString = fullyQualifiedKey.split("\\.");
362 String keyspace = splitString[0];
363 String table = splitString[1];
364 String primaryKeyValue = splitString[2];
366 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference,MusicUtil.getRetryCount());
367 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
368 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK + lockReference,
369 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR,e);
370 throw new MusicLockingException(e.getMessage());
372 long end = System.currentTimeMillis();
373 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
374 return new MusicLockState(LockStatus.UNLOCKED, "");
378 public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) throws MusicLockingException {
379 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
380 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
381 if (voluntaryRelease) {
382 return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
384 return forciblyReleaseLock(fullyQualifiedKey, lockRef);
388 public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference)
389 throws MusicLockingException {
390 MusicLockState result = null;
392 result = destroyLockRef(fullyQualifiedKey, lockReference);
393 } catch (Exception ex) {
394 logger.info(EELFLoggerDelegate.applicationLogger,
395 "Exception in voluntaryReleaseLock() for " + fullyQualifiedKey + "ref: " + lockReference);
396 throw new MusicLockingException(ex.getMessage());
401 public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
402 String[] splitString = fullyQualifiedKey.split("\\.");
403 String keyspace = splitString[0];
404 String table = splitString[1];
406 //leave a signal that this key could potentially be unsynchronized
407 String syncTable = keyspace+".unsyncedKeys_"+table;
408 PreparedQueryObject queryObject = new PreparedQueryObject();
409 String values = "(?)";
410 queryObject.addValue(fullyQualifiedKey);
411 String insQuery = "insert into "+syncTable+" (key) values "+values+";";
412 queryObject.appendQueryString(insQuery);
414 MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
415 } catch (Exception e) {
416 logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
417 + e.getMessage(), e);
420 //now release the lock
421 return destroyLockRef(fullyQualifiedKey, lockReference);
427 * @throws MusicLockingException
430 public void deleteLock(String lockName) throws MusicLockingException {
431 throw new MusicLockingException("Depreciated Method Delete Lock");
434 // Prepared Query Additions.
440 * @throws MusicServiceException
442 public ReturnType eventualPut(PreparedQueryObject queryObject) {
443 boolean result = false;
445 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
446 } catch (MusicServiceException | MusicQueryException ex) {
447 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
448 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
449 return new ReturnType(ResultType.FAILURE, ex.getMessage());
452 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
454 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
462 * @throws MusicServiceException
464 public ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
465 boolean result = false;
467 PreparedQueryObject getGaurd = new PreparedQueryObject();
468 getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
469 getGaurd.addValue(primaryKey);
471 ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
472 Row row = getGaurdResult.one();
474 guard = row.getLong("guard");
475 long timeOfWrite = System.currentTimeMillis();
476 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
477 String query = queryObject.getQuery();
478 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
479 if (queryObject.getOperation().equalsIgnoreCase("delete"))
480 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
482 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
484 queryObject.replaceQueryString(query);
487 } catch (MusicServiceException | MusicQueryException e) {
488 logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(), e);
491 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
492 } catch (MusicServiceException | MusicQueryException ex) {
493 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(),"[ERR512E] Failed to get Lock Handle ",
494 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
495 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " ", ex);
496 return new ReturnType(ResultType.FAILURE, ex.getMessage());
499 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
501 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
509 * @param primaryKeyValue
514 public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
515 PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
516 long start = System.currentTimeMillis();
518 String keyLock = lockId.substring(lockId.lastIndexOf(".") + 1,lockId.lastIndexOf("$"));
519 if (lockId.contains(".") && !keyLock.equals(primaryKeyValue)) {
520 return new ReturnType(ResultType.FAILURE,"Lock value '" + keyLock + "' and key value '"
521 + primaryKeyValue + "' not match. Please check your values: "
524 LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
525 lockId.substring(lockId.lastIndexOf("$") + 1));
527 if ( lockObject == null ) {
528 return new ReturnType(ResultType.FAILURE, lockId + " does not exist.");
529 } else if (!lockObject.getIsLockOwner()) {
530 return new ReturnType(ResultType.FAILURE, lockId + " is not the lock holder");
531 } else if (lockObject.getLocktype() != LockType.WRITE) {
532 return new ReturnType(ResultType.FAILURE,
533 "Attempting to do write operation, but " + lockId + " is a read lock");
536 if (conditionInfo != null) {
538 if (conditionInfo.testCondition() == false)
539 return new ReturnType(ResultType.FAILURE, "Lock acquired but the condition is not true");
540 } catch (Exception e) {
541 logger.error(EELFLoggerDelegate.errorLogger, e);
542 return new ReturnType(ResultType.FAILURE,
543 "Exception thrown while checking the condition, check its sanctity:\n" + e.getMessage());
546 String query = queryObject.getQuery();
547 long timeOfWrite = System.currentTimeMillis();
548 long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$") + 1));
549 long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
550 // TODO: use Statement instead of modifying query
551 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
552 if (queryObject.getOperation().equalsIgnoreCase("delete"))
553 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
554 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
555 query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
557 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
559 queryObject.replaceQueryString(query);
560 MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
561 dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
562 long end = System.currentTimeMillis();
563 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
564 } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
565 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), e);
566 return new ReturnType(ResultType.FAILURE,
567 "Exception thrown while doing the critical put: "
570 return new ReturnType(ResultType.SUCCESS, "Update performed");
578 * @return Boolean Indicates success or failure
579 * @throws MusicServiceException
583 public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException,MusicQueryException {
584 // this is mainly for some functions like keyspace creation etc which does not
585 // really need the bells and whistles of Music locking.
586 boolean result = false;
588 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
589 // } catch (MusicQueryException | MusicServiceException ex) {
590 // logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
591 // ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR, ex);
592 // throw new MusicServiceException(ex.getMessage(),ex);
594 return result ? ResultType.SUCCESS : ResultType.FAILURE;
598 * This method performs DDL operation on cassandra.
600 * @param queryObject query object containing prepared query and values
602 * @throws MusicServiceException
604 public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
605 ResultSet results = null;
607 results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
608 } catch (MusicQueryException | MusicServiceException e) {
609 logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
610 throw new MusicServiceException(e.getMessage());
616 * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
617 * is used to check if the resource is free.
619 * @param keyspace name of the keyspace
620 * @param table name of the table
621 * @param primaryKeyValue primary key value
622 * @param queryObject query object containing prepared query and values
623 * @param lockId lock ID to check if the resource is free to perform the operation.
626 public ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
627 PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
628 ResultSet results = null;
629 String keyLock = lockId.substring(lockId.lastIndexOf(".") + 1,lockId.lastIndexOf("$"));
631 if (lockId.contains(".") && !keyLock.equals(primaryKeyValue)) {
632 throw new MusicLockingException("Lock value '" + keyLock + "' and key value '"
633 + primaryKeyValue + "' do not match. Please check your values: "
636 LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
637 lockId.substring(lockId.lastIndexOf("$") + 1));
638 if (null == lockObject) {
639 throw new MusicLockingException("No Lock Object. Please check if lock name or key is correct."
642 if ( !lockObject.getIsLockOwner()) {
643 return null;// not top of the lock store q
645 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
646 } catch ( MusicLockingException e ) {
647 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity
648 .WARN, ErrorTypes.MUSICSERVICEERROR);
649 throw new MusicServiceException(
650 "Cannot perform critical get for key: " + primaryKeyValue + " : " + e.getMessage());
651 } catch (MusicQueryException | MusicServiceException e) {
652 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity
653 .WARN, ErrorTypes.MUSICSERVICEERROR, e);
654 throw new MusicServiceException(
655 "Cannot perform critical get for key: " + primaryKeyValue + " : " + e.getMessage());
661 * This method performs DML operation on cassandra, when the lock of the dd is acquired.
663 * @param keyspaceName name of the keyspace
664 * @param tableName name of the table
665 * @param primaryKey primary key value
666 * @param queryObject query object containing prepared query and values
668 * @throws MusicLockingException
669 * @throws MusicServiceException
670 * @throws MusicQueryException
672 public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
673 PreparedQueryObject queryObject, Condition conditionInfo)
674 throws MusicLockingException, MusicQueryException, MusicServiceException {
675 long start = System.currentTimeMillis();
676 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
677 String lockId = createLockReference(fullyQualifiedKey, LockType.WRITE);
678 long lockCreationTime = System.currentTimeMillis();
679 ReturnType lockAcqResult = null;
680 logger.info(EELFLoggerDelegate.applicationLogger,
681 "***Acquiring lock for atomicPut() query : " + queryObject.getQuery() + " : " + primaryKey);
682 logger.info(EELFLoggerDelegate.applicationLogger,
683 "***Acquiring lock for atomicPut() values: " + queryObject.getValues().toString());
684 if (conditionInfo != null) {
685 logger.info(EELFLoggerDelegate.applicationLogger,
686 "***Acquiring lock for atomicPut() conditions: " + conditionInfo.toString());
689 lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
690 } catch (MusicLockingException ex) {
691 logger.error(EELFLoggerDelegate.errorLogger,
692 "Exception while acquireLockWithLease() in atomic put for key: " + primaryKey);
693 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
694 throw new MusicServiceException(
695 "Cannot perform atomic put for key: " + primaryKey + " : " + ex.getMessage());
697 long lockAcqTime = System.currentTimeMillis();
700 * if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) { logger.info(EELFLoggerDelegate.
701 * applicationLogger,"unable to acquire lock, id " + lockId);
702 * voluntaryReleaseLock(fullyQualifiedKey,lockId); return lockAcqResult; }
705 logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
706 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
707 ReturnType criticalPutResult = null;
708 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
709 criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, queryObject, lockRef, conditionInfo);
710 long criticalPutTime = System.currentTimeMillis();
711 long lockDeleteTime = System.currentTimeMillis();
712 String timingInfo = "|lock creation time:" + (lockCreationTime - start) + "|lock accquire time:"
713 + (lockAcqTime - lockCreationTime) + "|critical put time:" + (criticalPutTime - lockAcqTime)
714 + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
715 criticalPutResult.setTimingInfo(timingInfo);
717 logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
718 criticalPutResult = lockAcqResult;
721 voluntaryReleaseLock(fullyQualifiedKey, lockId);
722 } catch (MusicLockingException ex) {
723 logger.info(EELFLoggerDelegate.applicationLogger,
724 "Exception occured while deleting lock after atomic put for key: " + primaryKey);
725 criticalPutResult.setMessage(criticalPutResult.getMessage() + "Lock release failed");
727 return criticalPutResult;
733 * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
735 * @param keyspaceName name of the keyspace
736 * @param tableName name of the table
737 * @param primaryKey primary key value
738 * @param queryObject query object containing prepared query and values
740 * @throws MusicServiceException
741 * @throws MusicLockingException
742 * @throws MusicQueryException
744 public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
745 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
746 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
747 String lockId = createLockReference(fullyQualifiedKey, LockType.READ);
748 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
749 ReturnType lockAcqResult = null;
750 ResultSet result = null;
751 logger.info(EELFLoggerDelegate.applicationLogger, "Acquiring lock for atomicGet() : " + queryObject.getQuery());
753 lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
754 } catch (MusicLockingException ex) {
755 logger.error(EELFLoggerDelegate.errorLogger,
756 "Exception while acquireLockWithLease() in atomic get for key: " + primaryKey);
757 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
758 throw new MusicServiceException(
759 "Cannot perform atomic get for key: " + primaryKey + " : " + ex.getMessage());
761 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
762 logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
763 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
764 result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
766 logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
769 voluntaryReleaseLock(fullyQualifiedKey, lockId);
770 } catch (MusicLockingException ex) {
771 logger.info(EELFLoggerDelegate.applicationLogger,
772 "Exception occured while deleting lock after atomic put for key: " + primaryKey);
773 throw new MusicLockingException(ex.getMessage());
785 public Map<String, Object> validateLock(String lockName) {
786 return MusicUtil.validateLock(lockName);
791 public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
792 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
797 public List<String> getLockQueue(String fullyQualifiedKey)
798 throws MusicServiceException, MusicQueryException, MusicLockingException {
799 String[] splitString = fullyQualifiedKey.split("\\.");
800 String keyspace = splitString[0];
801 String table = splitString[1];
802 String primaryKeyValue = splitString[2];
804 return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
807 public long getLockQueueSize(String fullyQualifiedKey)
808 throws MusicServiceException, MusicQueryException, MusicLockingException {
809 String[] splitString = fullyQualifiedKey.split("\\.");
810 String keyspace = splitString[0];
811 String table = splitString[1];
812 String primaryKeyValue = splitString[2];
814 return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
819 public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
820 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
825 //Methods added for ORM changes
827 public ResultType createKeyspace(JsonKeySpace jsonKeySpaceObject,String consistencyInfo)
828 throws MusicServiceException,MusicQueryException {
829 ResultType result = nonKeyRelatedPut(jsonKeySpaceObject.genCreateKeyspaceQuery(), consistencyInfo);
830 logger.info(EELFLoggerDelegate.applicationLogger, " Keyspace Creation Process completed successfully");
835 public ResultType dropKeyspace(JsonKeySpace jsonKeySpaceObject, String consistencyInfo)
836 throws MusicServiceException,MusicQueryException {
837 ResultType result = nonKeyRelatedPut(jsonKeySpaceObject.genDropKeyspaceQuery(),
839 logger.info(EELFLoggerDelegate.applicationLogger, " Keyspace deletion Process completed successfully");
843 public ResultType createTable(JsonTable jsonTableObject, String consistencyInfo)
844 throws MusicServiceException, MusicQueryException {
845 ResultType result = null;
847 result = createTable(jsonTableObject.getKeyspaceName(),
848 jsonTableObject.getTableName(), jsonTableObject.genCreateTableQuery(), consistencyInfo);
850 } catch (MusicServiceException ex) {
851 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR, ErrorSeverity.WARN,
852 ErrorTypes.MUSICSERVICEERROR);
853 throw new MusicServiceException(ex.getMessage());
855 logger.info(EELFLoggerDelegate.applicationLogger, " Table Creation Process completed successfully ");
859 public ResultType dropTable(JsonTable jsonTableObject,String consistencyInfo)
860 throws MusicServiceException,MusicQueryException {
861 ResultType result = nonKeyRelatedPut(jsonTableObject.genDropTableQuery(),
863 logger.info(EELFLoggerDelegate.applicationLogger, " Table deletion Process completed successfully ");
869 public ResultType createIndex(JsonIndex jsonIndexObject, String consistencyInfo)
870 throws MusicServiceException, MusicQueryException{
871 ResultType result = nonKeyRelatedPut(jsonIndexObject.genCreateIndexQuery(),
874 logger.info(EELFLoggerDelegate.applicationLogger, " Index creation Process completed successfully ");
879 * This method performs DDL operation on cassandra.
881 * @param queryObject query object containing prepared query and values
883 * @throws MusicServiceException
885 public ResultSet select(JsonSelect jsonSelect, MultivaluedMap<String, String> rowParams)
886 throws MusicServiceException, MusicQueryException {
887 ResultSet results = null;
889 results = get(jsonSelect.genSelectQuery(rowParams));
890 } catch (MusicServiceException e) {
891 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
892 throw new MusicServiceException(e.getMessage());
900 public ResultSet selectCritical(JsonInsert jsonInsertObj, MultivaluedMap<String, String> rowParams)
901 throws MusicLockingException, MusicQueryException, MusicServiceException {
903 ResultSet results = null;
904 String consistency = "";
905 if(null != jsonInsertObj && null != jsonInsertObj.getConsistencyInfo()) {
906 consistency = jsonInsertObj.getConsistencyInfo().get("type");
909 String lockId = jsonInsertObj.getConsistencyInfo().get("lockId");
911 PreparedQueryObject queryObject = jsonInsertObj.genSelectCriticalPreparedQueryObj(rowParams);
913 if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
914 results = criticalGet(jsonInsertObj.getKeyspaceName(), jsonInsertObj.getTableName(),
915 jsonInsertObj.getPrimaryKeyVal(), queryObject,lockId);
916 } else if (consistency.equalsIgnoreCase(MusicUtil.ATOMIC)) {
917 results = atomicGet(jsonInsertObj.getKeyspaceName(), jsonInsertObj.getTableName(),
918 jsonInsertObj.getPrimaryKeyVal(), queryObject);
925 * this is insert row into Table
927 public ReturnType insertIntoTable(JsonInsert jsonInsertObj)
928 throws MusicLockingException, MusicQueryException, MusicServiceException {
930 String consistency = "";
931 if(null != jsonInsertObj && null != jsonInsertObj.getConsistencyInfo()) {
932 consistency = jsonInsertObj.getConsistencyInfo().get("type");
935 ReturnType result = null;
938 PreparedQueryObject queryObj = null;
939 queryObj = jsonInsertObj.genInsertPreparedQueryObj();
941 if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
942 result = eventualPut(jsonInsertObj.genInsertPreparedQueryObj());
943 } else if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
944 String lockId = jsonInsertObj.getConsistencyInfo().get("lockId");
946 logger.error(EELFLoggerDelegate.errorLogger,"LockId cannot be null. Create lock reference or"
947 + " use ATOMIC instead of CRITICAL", ErrorSeverity.FATAL, ErrorTypes.MUSICSERVICEERROR);
948 return new ReturnType(ResultType.FAILURE, "LockId cannot be null. Create lock "
949 + "and acquire lock or use ATOMIC instead of CRITICAL");
951 result = criticalPut(jsonInsertObj.getKeyspaceName(),
952 jsonInsertObj.getTableName(), jsonInsertObj.getPrimaryKeyVal(), jsonInsertObj.genInsertPreparedQueryObj(), lockId,null);
953 } else if (consistency.equalsIgnoreCase(MusicUtil.ATOMIC)) {
954 result = atomicPut(jsonInsertObj.getKeyspaceName(), jsonInsertObj.getTableName(),
955 jsonInsertObj.getPrimaryKeyVal(), jsonInsertObj.genInsertPreparedQueryObj(), null);
957 } catch (Exception ex) {
958 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity
959 .WARN, ErrorTypes.MUSICSERVICEERROR, ex);
960 return new ReturnType(ResultType.FAILURE, ex.getMessage());
967 * This is insert row into Table
969 public ReturnType updateTable(JsonUpdate jsonUpdateObj, MultivaluedMap<String, String> rowParams)
970 throws MusicLockingException, MusicQueryException, MusicServiceException {
972 ReturnType result = null;
973 String consistency = "";
974 if(null != jsonUpdateObj && null != jsonUpdateObj.getConsistencyInfo()) {
975 consistency = jsonUpdateObj.getConsistencyInfo().get("type");
977 PreparedQueryObject queryObject = jsonUpdateObj.genUpdatePreparedQueryObj(rowParams);
979 Condition conditionInfo;
980 if (jsonUpdateObj.getConditions() == null) {
981 conditionInfo = null;
983 // to avoid parsing repeatedly, just send the select query to obtain row
984 PreparedQueryObject selectQuery = new PreparedQueryObject();
985 selectQuery.appendQueryString("SELECT * FROM " + jsonUpdateObj.getKeyspaceName() + "." + jsonUpdateObj.getTableName() + " WHERE "
986 + jsonUpdateObj.getRowIdString() + ";");
987 selectQuery.addValue(jsonUpdateObj.getPrimarKeyValue());
988 conditionInfo = new Condition(jsonUpdateObj.getConditions(), selectQuery);
992 if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
993 result = eventualPut(queryObject);
994 } else if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
995 String lockId = jsonUpdateObj.getConsistencyInfo().get("lockId");
997 logger.error(EELFLoggerDelegate.errorLogger,"LockId cannot be null. Create lock reference or"
998 + " use ATOMIC instead of CRITICAL", ErrorSeverity.FATAL, ErrorTypes.MUSICSERVICEERROR);
1000 return new ReturnType(ResultType.FAILURE, "LockId cannot be null. Create lock "
1001 + "and acquire lock or use ATOMIC instead of CRITICAL");
1003 result = criticalPut(jsonUpdateObj.getKeyspaceName(), jsonUpdateObj.getTableName(), jsonUpdateObj.getPrimarKeyValue(),
1004 queryObject, lockId, conditionInfo);
1005 } else if (consistency.equalsIgnoreCase("atomic_delete_lock")) {
1006 // this function is mainly for the benchmarks
1008 result = atomicPutWithDeleteLock(jsonUpdateObj.getKeyspaceName(), jsonUpdateObj.getTableName(),
1009 jsonUpdateObj.getPrimarKeyValue(), queryObject, conditionInfo);
1010 } catch (MusicLockingException e) {
1011 logger.error(EELFLoggerDelegate.errorLogger,e, AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN,
1012 ErrorTypes.GENERALSERVICEERROR, e);
1013 throw new MusicLockingException(AppMessages.UNKNOWNERROR.toString());
1016 } else if (consistency.equalsIgnoreCase(MusicUtil.ATOMIC)) {
1018 result = atomicPut(jsonUpdateObj.getKeyspaceName(), jsonUpdateObj.getTableName(), jsonUpdateObj.getPrimarKeyValue(),
1019 queryObject, conditionInfo);
1020 } catch (MusicLockingException e) {
1021 logger.error(EELFLoggerDelegate.errorLogger,e, AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.GENERALSERVICEERROR, e);
1022 throw new MusicLockingException(AppMessages.UNKNOWNERROR.toString());
1024 } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL_NB)) {
1026 result = eventualPut_nb(queryObject, jsonUpdateObj.getKeyspaceName(),
1027 jsonUpdateObj.getTableName(), jsonUpdateObj.getPrimarKeyValue());
1028 }catch (Exception e) {
1029 return new ReturnType(ResultType.FAILURE, e.getMessage());
1038 * This method is for Delete From Table
1040 public ReturnType deleteFromTable(JsonDelete jsonDeleteObj, MultivaluedMap<String, String> rowParams)
1041 throws MusicLockingException, MusicQueryException, MusicServiceException {
1043 ReturnType result = null;
1044 String consistency = "";
1045 if(null != jsonDeleteObj && null != jsonDeleteObj.getConsistencyInfo()) {
1046 consistency = jsonDeleteObj.getConsistencyInfo().get("type");
1048 PreparedQueryObject queryObject = jsonDeleteObj.genDeletePreparedQueryObj(rowParams);
1050 // get the conditional, if any
1051 Condition conditionInfo;
1052 if (jsonDeleteObj.getConditions() == null) {
1053 conditionInfo = null;
1055 // to avoid parsing repeatedly, just send the select query to obtain row
1056 PreparedQueryObject selectQuery = new PreparedQueryObject();
1057 selectQuery.appendQueryString("SELECT * FROM " + jsonDeleteObj.getKeyspaceName() + "." + jsonDeleteObj.getTableName() + " WHERE "
1058 + jsonDeleteObj.getRowIdString() + ";");
1059 selectQuery.addValue(jsonDeleteObj.getPrimarKeyValue());
1060 conditionInfo = new Condition(jsonDeleteObj.getConditions(), selectQuery);
1063 if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL))
1064 result = eventualPut(queryObject);
1065 else if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
1066 String lockId = jsonDeleteObj.getConsistencyInfo().get("lockId");
1067 if(lockId == null) {
1068 logger.error(EELFLoggerDelegate.errorLogger,"LockId cannot be null. Create lock reference or"
1069 + " use ATOMIC instead of CRITICAL", ErrorSeverity.FATAL, ErrorTypes.MUSICSERVICEERROR);
1071 return new ReturnType(ResultType.FAILURE, "LockId cannot be null. Create lock "
1072 + "and acquire lock or use ATOMIC instead of CRITICAL");
1074 result = criticalPut(jsonDeleteObj.getKeyspaceName(),
1075 jsonDeleteObj.getTableName(), jsonDeleteObj.getPrimarKeyValue(),
1076 queryObject, lockId, conditionInfo);
1077 } else if (consistency.equalsIgnoreCase(MusicUtil.ATOMIC)) {
1078 result = atomicPut(jsonDeleteObj.getKeyspaceName(),
1079 jsonDeleteObj.getTableName(), jsonDeleteObj.getPrimarKeyValue(),
1080 queryObject, conditionInfo);
1081 } else if(consistency.equalsIgnoreCase(MusicUtil.EVENTUAL_NB)) {
1082 result = eventualPut_nb(queryObject, jsonDeleteObj.getKeyspaceName(),
1083 jsonDeleteObj.getTableName(), jsonDeleteObj.getPrimarKeyValue());