2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * ============LICENSE_END=============================================
20 * ====================================================================
23 package org.onap.music.service.impl;
25 import java.io.StringWriter;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.StringTokenizer;
31 import org.onap.music.datastore.MusicDataStore;
32 import org.onap.music.datastore.MusicDataStoreHandle;
33 import org.onap.music.datastore.PreparedQueryObject;
34 import org.onap.music.eelf.logging.EELFLoggerDelegate;
35 import org.onap.music.eelf.logging.format.AppMessages;
36 import org.onap.music.eelf.logging.format.ErrorSeverity;
37 import org.onap.music.eelf.logging.format.ErrorTypes;
38 import org.onap.music.exceptions.MusicLockingException;
39 import org.onap.music.exceptions.MusicQueryException;
40 import org.onap.music.exceptions.MusicServiceException;
41 import org.onap.music.lockingservice.cassandra.CassaLockStore;
42 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
43 import org.onap.music.lockingservice.cassandra.MusicLockState;
44 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
45 import org.onap.music.main.MusicUtil;
46 import org.onap.music.main.ResultType;
47 import org.onap.music.main.ReturnType;
48 import org.onap.music.service.MusicCoreService;
50 import com.datastax.driver.core.ColumnDefinitions;
51 import com.datastax.driver.core.ColumnDefinitions.Definition;
52 import com.datastax.driver.core.DataType;
53 import com.datastax.driver.core.ResultSet;
54 import com.datastax.driver.core.Row;
55 import com.datastax.driver.core.TableMetadata;
56 import com.google.common.util.concurrent.Monitor.Guard;
58 import org.onap.music.datastore.*;
60 public class MusicCassaCore implements MusicCoreService {
62 public static CassaLockStore mLockHandle = null;;
63 private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
64 private static boolean unitTestRun=true;
65 private static MusicCassaCore musicCassaCoreInstance = null;
67 private MusicCassaCore() {
70 public static MusicCassaCore getInstance() {
72 if(musicCassaCoreInstance == null) {
73 musicCassaCoreInstance = new MusicCassaCore();
75 return musicCassaCoreInstance;
78 public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
79 logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
80 long start = System.currentTimeMillis();
82 if (mLockHandle == null) {
84 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
85 } catch (Exception e) {
86 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
87 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
90 long end = System.currentTimeMillis();
91 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
97 public String createLockReference(String fullyQualifiedKey) {
98 String[] splitString = fullyQualifiedKey.split("\\.");
99 String keyspace = splitString[0];
100 String table = splitString[1];
101 String lockName = splitString[2];
103 logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
104 long start = System.currentTimeMillis();
105 String lockReference = null;
107 lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName);
108 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
111 long end = System.currentTimeMillis();
112 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
113 return lockReference;
117 public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
118 evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
119 return acquireLock(fullyQualifiedKey, lockReference);
122 private void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
124 String[] splitString = fullyQualifiedKey.split("\\.");
125 String keyspace = splitString[0];
126 String table = splitString[1];
127 String primaryKeyValue = splitString[2];
129 LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
131 /* Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the
134 long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.acquireTime), Long.parseLong(currentLockHolderObject.createTime));
135 if((System.currentTimeMillis() - referenceTime) > leasePeriod) {
136 forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.lockRef+"");
137 logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.lockRef+" forcibly released");
141 private static ReturnType isTopOfLockStore(String keyspace, String table, String primaryKeyValue, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException {
143 //return failure to lock holders too early or already evicted from the lock store
144 String topOfLockStoreS = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
145 long topOfLockStoreL = Long.parseLong(topOfLockStoreS);
146 long lockReferenceL = Long.parseLong(lockReference);
148 if(lockReferenceL > topOfLockStoreL) {
149 logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not the lock holder yet");
150 return new ReturnType(ResultType.FAILURE, lockReference+" is not the lock holder yet");
154 if(lockReferenceL < topOfLockStoreL) {
155 logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is no longer/or was never in the lock store queue");
156 return new ReturnType(ResultType.FAILURE, lockReference+" is no longer/or was never in the lock store queue");
159 return new ReturnType(ResultType.SUCCESS, lockReference+" is top of lock store");
162 public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
163 throws MusicLockingException, MusicQueryException, MusicServiceException {
164 String[] splitString = lockId.split("\\.");
165 String keyspace = splitString[0].substring(1);//remove '$'
166 String table = splitString[1];
167 String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
168 fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
169 String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
171 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockRef);
173 if(result.getResult().equals(ResultType.FAILURE))
174 return result;//not top of the lock store q
176 //check to see if the value of the key has to be synced in case there was a forceful release
177 String syncTable = keyspace+".unsyncedKeys_"+table;
178 String query = "select * from "+syncTable+" where key='"+fullyQualifiedKey+"';";
179 PreparedQueryObject readQueryObject = new PreparedQueryObject();
180 readQueryObject.appendQueryString(query);
181 ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
182 if (results.all().size() != 0) {
183 logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
185 syncQuorum(keyspace, table, primaryKeyValue);
186 } catch (Exception e) {
187 StringWriter sw = new StringWriter();
188 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
189 String exceptionAsString = sw.toString();
190 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
192 String cleanQuery = "delete from music_internal.unsynced_keys where key='"+fullyQualifiedKey+"';";
193 PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
194 deleteQueryObject.appendQueryString(cleanQuery);
195 MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
198 getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
200 return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
207 * @param tableQueryObject
209 * @return Boolean Indicates success or failure
210 * @throws MusicServiceException
214 public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject, String consistency) throws MusicServiceException {
215 boolean result = false;
218 //create shadow locking table
219 result = getLockingServiceHandle().createLockQueue(keyspace, table);
221 return ResultType.FAILURE;
225 //create table to track unsynced_keys
226 table = "unsyncedKeys_"+table;
228 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
229 + " ( key text,PRIMARY KEY (key) );";
230 System.out.println(tabQuery);
231 PreparedQueryObject queryObject = new PreparedQueryObject();
233 queryObject.appendQueryString(tabQuery);
235 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
238 //create actual table
239 result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
240 } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
241 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
242 throw new MusicServiceException(ex.getMessage());
244 return result?ResultType.SUCCESS:ResultType.FAILURE;
247 private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
248 logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
249 PreparedQueryObject selectQuery = new PreparedQueryObject();
250 PreparedQueryObject updateQuery = new PreparedQueryObject();
252 // get the primary key d
253 TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
254 String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName();// we only support single
256 DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
257 Object cqlFormattedPrimaryKeyValue =
258 MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
260 // get the row of data from a quorum
261 selectQuery.appendQueryString("SELECT * FROM " + keyspace + "." + table + " WHERE "
262 + primaryKeyName + "= ?" + ";");
263 selectQuery.addValue(cqlFormattedPrimaryKeyValue);
264 ResultSet results = null;
266 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(selectQuery);
267 // write it back to a quorum
268 Row row = results.one();
269 ColumnDefinitions colInfo = row.getColumnDefinitions();
270 int totalColumns = colInfo.size();
272 StringBuilder fieldValueString = new StringBuilder("");
273 for (Definition definition : colInfo) {
274 String colName = definition.getName();
275 if (colName.equals(primaryKeyName))
277 DataType colType = definition.getType();
278 Object valueObj = MusicDataStoreHandle.getDSHandle().getColValue(row, colName, colType);
279 Object valueString = MusicUtil.convertToActualDataType(colType, valueObj);
280 fieldValueString.append(colName + " = ?");
281 updateQuery.addValue(valueString);
282 if (counter != (totalColumns - 1))
283 fieldValueString.append(",");
284 counter = counter + 1;
286 updateQuery.appendQueryString("UPDATE " + keyspace + "." + table + " SET "
287 + fieldValueString + " WHERE " + primaryKeyName + "= ? " + ";");
288 updateQuery.addValue(cqlFormattedPrimaryKeyValue);
290 MusicDataStoreHandle.getDSHandle().executePut(updateQuery, "critical");
291 } catch (MusicServiceException | MusicQueryException e) {
292 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.QUERYERROR +""+updateQuery ,ErrorSeverity.MAJOR, ErrorTypes.QUERYERROR);
304 public ResultSet quorumGet(PreparedQueryObject query) {
305 ResultSet results = null;
307 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
308 } catch (MusicServiceException | MusicQueryException e) {
309 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR);
320 * @param fullyQualifiedKey lockName
323 public String whoseTurnIsIt(String fullyQualifiedKey) {
324 String[] splitString = fullyQualifiedKey.split("\\.");
325 String keyspace = splitString[0];
326 String table = splitString[1];
327 String primaryKeyValue = splitString[2];
329 return "$" + fullyQualifiedKey + "$"
330 + getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
331 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
332 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
339 * @param lockReference
342 public static String getLockNameFromId(String lockReference) {
343 StringTokenizer st = new StringTokenizer(lockReference);
344 return st.nextToken("$");
348 public void destroyLockRef(String lockId) {
349 long start = System.currentTimeMillis();
350 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
351 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
352 String[] splitString = fullyQualifiedKey.split("\\.");
353 String keyspace = splitString[0];
354 String table = splitString[1];
355 String primaryKeyValue = splitString[2];
357 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef);
358 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
359 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
361 long end = System.currentTimeMillis();
362 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
365 public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) {
366 long start = System.currentTimeMillis();
367 String[] splitString = fullyQualifiedKey.split("\\.");
368 String keyspace = splitString[0];
369 String table = splitString[1];
370 String primaryKeyValue = splitString[2];
372 getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference);
373 } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
374 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockReference ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
376 long end = System.currentTimeMillis();
377 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
378 return new MusicLockState(LockStatus.UNLOCKED, "");
382 public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) {
383 String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
384 String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
385 if (voluntaryRelease) {
386 return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
388 return forciblyReleaseLock(fullyQualifiedKey, lockRef);
392 public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference) {
393 return destroyLockRef(fullyQualifiedKey, lockReference);
396 public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) {
397 String[] splitString = fullyQualifiedKey.split("\\.");
398 String keyspace = splitString[0];
399 String table = splitString[1];
401 //leave a signal that this key could potentially be unsynchronized
402 String syncTable = keyspace+".unsyncedKeys_"+table;
403 PreparedQueryObject queryObject = new PreparedQueryObject();
404 String values = "(?)";
405 queryObject.addValue(fullyQualifiedKey);
406 String insQuery = "insert into "+syncTable+" (key) values "+values+";";
407 queryObject.appendQueryString(insQuery);
409 MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
410 } catch (Exception e) {
411 logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
415 //now release the lock
416 return destroyLockRef(fullyQualifiedKey, lockReference);
422 * @throws MusicLockingException
424 public void deleteLock(String lockName) throws MusicLockingException {
428 // Prepared Query Additions.
434 * @throws MusicServiceException
436 public ReturnType eventualPut(PreparedQueryObject queryObject) {
437 boolean result = false;
439 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
440 } catch (MusicServiceException | MusicQueryException ex) {
441 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
442 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
443 return new ReturnType(ResultType.FAILURE, ex.getMessage());
446 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
448 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
456 * @throws MusicServiceException
458 public ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
459 boolean result = false;
461 PreparedQueryObject getGaurd = new PreparedQueryObject();
462 getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
463 getGaurd.addValue(primaryKey);
465 ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
466 Row row = getGaurdResult.one();
468 guard = row.getLong("guard");
469 long timeOfWrite = System.currentTimeMillis();
470 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
471 String query = queryObject.getQuery();
472 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
473 if (queryObject.getOperation().equalsIgnoreCase("delete"))
474 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
476 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
478 queryObject.replaceQueryString(query);
481 } catch (MusicServiceException | MusicQueryException e) {
482 logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage());
485 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
486 } catch (MusicServiceException | MusicQueryException ex) {
487 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
488 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
489 return new ReturnType(ResultType.FAILURE, ex.getMessage());
492 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
494 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
502 * @param primaryKeyValue
507 public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
508 PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
509 long start = System.currentTimeMillis();
511 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
512 lockId.substring(lockId.lastIndexOf("$")+1));
513 if(result.getResult().equals(ResultType.FAILURE))
514 return result;//not top of the lock store q
516 if (conditionInfo != null)
518 if (conditionInfo.testCondition() == false)
519 return new ReturnType(ResultType.FAILURE,
520 "Lock acquired but the condition is not true");
521 } catch (Exception e) {
522 return new ReturnType(ResultType.FAILURE,
523 "Exception thrown while checking the condition, check its sanctity:\n"
527 String query = queryObject.getQuery();
528 long timeOfWrite = System.currentTimeMillis();
529 long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$")+1));
530 long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
531 // TODO: use Statement instead of modifying query
532 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
533 if (queryObject.getOperation().equalsIgnoreCase("delete"))
534 query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
535 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
536 query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
538 query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
540 queryObject.replaceQueryString(query);
541 MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
542 dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
543 long end = System.currentTimeMillis();
544 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
545 }catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
546 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
547 return new ReturnType(ResultType.FAILURE,
548 "Exception thrown while doing the critical put\n"
551 return new ReturnType(ResultType.SUCCESS, "Update performed");
559 * @return Boolean Indicates success or failure
560 * @throws MusicServiceException
564 public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException {
565 // this is mainly for some functions like keyspace creation etc which does not
566 // really need the bells and whistles of Music locking.
567 boolean result = false;
569 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
570 } catch (MusicQueryException | MusicServiceException ex) {
571 logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
572 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
573 throw new MusicServiceException(ex.getMessage());
575 return result ? ResultType.SUCCESS : ResultType.FAILURE;
579 * This method performs DDL operation on cassandra.
581 * @param queryObject query object containing prepared query and values
583 * @throws MusicServiceException
585 public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
586 ResultSet results = null;
588 results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
589 } catch (MusicQueryException | MusicServiceException e) {
590 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
591 throw new MusicServiceException(e.getMessage());
597 * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
598 * is used to check if the resource is free.
600 * @param keyspace name of the keyspace
601 * @param table name of the table
602 * @param primaryKeyValue primary key value
603 * @param queryObject query object containing prepared query and values
604 * @param lockId lock ID to check if the resource is free to perform the operation.
607 public ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
608 PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
609 ResultSet results = null;
612 ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
613 lockId.substring(lockId.lastIndexOf("$")+1));
614 if(result.getResult().equals(ResultType.FAILURE))
615 return null;//not top of the lock store q
616 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
617 } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
618 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
624 * This method performs DML operation on cassandra, when the lock of the dd is acquired.
626 * @param keyspaceName name of the keyspace
627 * @param tableName name of the table
628 * @param primaryKey primary key value
629 * @param queryObject query object containing prepared query and values
631 * @throws MusicLockingException
632 * @throws MusicServiceException
633 * @throws MusicQueryException
635 public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
636 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException, MusicQueryException, MusicServiceException {
637 long start = System.currentTimeMillis();
638 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
639 String lockId = createLockReference(fullyQualifiedKey);
640 long lockCreationTime = System.currentTimeMillis();
641 ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockId);
642 long lockAcqTime = System.currentTimeMillis();
644 if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
645 logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
646 voluntaryReleaseLock(fullyQualifiedKey,lockId);
647 return lockAcqResult;
650 logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
651 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
652 ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
653 queryObject, lockRef, conditionInfo);
654 long criticalPutTime = System.currentTimeMillis();
655 voluntaryReleaseLock(fullyQualifiedKey,lockId);
656 long lockDeleteTime = System.currentTimeMillis();
657 String timingInfo = "|lock creation time:" + (lockCreationTime - start)
658 + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
659 + "|critical put time:" + (criticalPutTime - lockAcqTime)
660 + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
661 criticalPutResult.setTimingInfo(timingInfo);
662 return criticalPutResult;
669 * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
671 * @param keyspaceName name of the keyspace
672 * @param tableName name of the table
673 * @param primaryKey primary key value
674 * @param queryObject query object containing prepared query and values
676 * @throws MusicServiceException
677 * @throws MusicLockingException
678 * @throws MusicQueryException
680 public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
681 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
682 String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
683 String lockId = createLockReference(fullyQualifiedKey);
684 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
685 ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockId);
686 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
687 logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
688 String lockRef = lockId.substring(lockId.lastIndexOf("$"));
690 criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
691 voluntaryReleaseLock(fullyQualifiedKey,lockId);
694 voluntaryReleaseLock(fullyQualifiedKey,lockId);
695 logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
706 public Map<String, Object> validateLock(String lockName) {
707 Map<String, Object> resultMap = new HashMap<>();
708 String[] locks = lockName.split("\\.");
709 if(locks.length < 3) {
710 resultMap.put("Error", "Invalid lock. Please make sure lock is of the type keyspaceName.tableName.primaryKey");
713 String keyspace= locks[0];
714 if(keyspace.startsWith("$"))
715 keyspace = keyspace.substring(1);
716 resultMap.put("keyspace",keyspace);
721 public static void main(String[] args) {
722 String x = "axe top";
723 x = x.replaceFirst("top", "sword");
724 System.out.print(x); //returns sword pickaxe
730 public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
731 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
736 public List<String> getLockQueue(String fullyQualifiedKey)
737 throws MusicServiceException, MusicQueryException, MusicLockingException {
738 String[] splitString = fullyQualifiedKey.split("\\.");
739 String keyspace = splitString[0];
740 String table = splitString[1];
741 String primaryKeyValue = splitString[2];
743 return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
746 public long getLockQueueSize(String fullyQualifiedKey)
747 throws MusicServiceException, MusicQueryException, MusicLockingException {
748 String[] splitString = fullyQualifiedKey.split("\\.");
749 String keyspace = splitString[0];
750 String table = splitString[1];
751 String primaryKeyValue = splitString[2];
753 return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
756 public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
757 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {