Some bug fixes and Minor Chages.
[music.git] / src / main / java / org / onap / music / service / impl / MusicCassaCore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  *  Modifications Copyright (c) 2018 IBM. 
7  * ===================================================================
8  *  Modifications Copyright (c) 2019 Samsung
9  * ===================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *
14  *     http://www.apache.org/licenses/LICENSE-2.0
15  *
16  *  Unless required by applicable law or agreed to in writing, software
17  *  distributed under the License is distributed on an "AS IS" BASIS,
18  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  *  See the License for the specific language governing permissions and
20  *  limitations under the License.
21  *
22  * ============LICENSE_END=============================================
23  * ====================================================================
24  */
25
26 package org.onap.music.service.impl;
27
28 import java.io.StringWriter;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.StringTokenizer;
32 import java.util.concurrent.TimeUnit;
33
34 import org.onap.music.datastore.MusicDataStore;
35 import org.onap.music.datastore.MusicDataStoreHandle;
36 import org.onap.music.datastore.PreparedQueryObject;
37 import org.onap.music.eelf.logging.EELFLoggerDelegate;
38 import org.onap.music.eelf.logging.format.AppMessages;
39 import org.onap.music.eelf.logging.format.ErrorSeverity;
40 import org.onap.music.eelf.logging.format.ErrorTypes;
41 import org.onap.music.exceptions.MusicLockingException;
42 import org.onap.music.exceptions.MusicQueryException;
43 import org.onap.music.exceptions.MusicServiceException;
44 import org.onap.music.lockingservice.cassandra.CassaLockStore;
45 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
46 import org.onap.music.lockingservice.cassandra.LockType;
47 import org.onap.music.lockingservice.cassandra.MusicLockState;
48 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
49 import org.onap.music.main.MusicUtil;
50 import org.onap.music.main.ResultType;
51 import org.onap.music.main.ReturnType;
52 import org.onap.music.service.MusicCoreService;
53
54 import com.att.eelf.configuration.EELFLogger;
55 import com.datastax.driver.core.DataType;
56 import com.datastax.driver.core.ResultSet;
57 import com.datastax.driver.core.Row;
58 import com.datastax.driver.core.TableMetadata;
59
60 import org.onap.music.datastore.*;
61
62 public class MusicCassaCore implements MusicCoreService {
63
64     public static CassaLockStore mLockHandle = null;;
65     private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
66     private static boolean unitTestRun=true;
67     private static MusicCassaCore musicCassaCoreInstance = null;
68
69     private MusicCassaCore() {
70
71     }
72     public static MusicCassaCore getInstance() {
73
74         if(musicCassaCoreInstance == null) {
75             musicCassaCoreInstance = new MusicCassaCore();
76         }
77         return musicCassaCoreInstance;
78     }
79
80     public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
81         logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
82         long start = System.currentTimeMillis();
83
84         if (mLockHandle == null) {
85             try {
86                 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
87             } catch (Exception e) {
88                 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
89                 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
90             }
91         }
92         long end = System.currentTimeMillis();
93         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
94         return mLockHandle;
95     }
96
97     public String createLockReference(String fullyQualifiedKey) throws MusicLockingException {
98         return createLockReference(fullyQualifiedKey, LockType.WRITE);
99     }
100
101     public String createLockReference(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
102         String[] splitString = fullyQualifiedKey.split("\\.");
103         String keyspace = splitString[0];
104         String table = splitString[1];
105         String lockName = splitString[2];
106
107         logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
108         long start = System.currentTimeMillis();
109         String lockReference = null;
110         
111         try {
112             lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype);
113         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
114             e.printStackTrace();
115             throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
116         } catch (Exception e) {
117             logger.error(EELFLoggerDelegate.applicationLogger, e);
118             throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
119         }
120         long end = System.currentTimeMillis();
121         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
122         return lockReference;
123     }
124
125
126     public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod)
127             throws MusicLockingException, MusicQueryException, MusicServiceException  {
128          evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
129          return acquireLock(fullyQualifiedKey, lockReference);
130     }
131
132     private void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod)
133             throws MusicLockingException, MusicQueryException, MusicServiceException {
134         String[] splitString = fullyQualifiedKey.split("\\.");
135         String keyspace = splitString[0];
136         String table = splitString[1];
137         String primaryKeyValue = splitString[2];
138
139         LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
140
141         if (!currentLockHolderObject.getIsLockOwner()) { // no lock holder
142             return;
143         }
144         /*
145          * Release the lock of the previous holder if it has expired. if the update to the acquire time has
146          * not reached due to network delays, simply use the create time as the reference
147          */
148         long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.getAcquireTime()),
149                 Long.parseLong(currentLockHolderObject.getCreateTime()));
150         if ((System.currentTimeMillis() - referenceTime) > leasePeriod) {
151             forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.getLockRef() + "");
152             logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.getLockRef() + " forcibly released");
153         }
154     }
155
156     public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
157             throws MusicLockingException, MusicQueryException, MusicServiceException {
158         String[] splitString = lockId.split("\\.");
159         String keyspace = splitString[0].substring(1);//remove '$'
160         String table = splitString[1];
161         String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
162         String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
163         String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
164
165         LockObject lockInfo = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue, lockRef);
166
167         if (!lockInfo.getIsLockOwner()) {
168             return new ReturnType(ResultType.FAILURE, lockId + " is not a lock holder");//not top of the lock store q
169         }
170    
171         //check to see if the value of the key has to be synced in case there was a forceful release
172         String syncTable = keyspace+".unsyncedKeys_"+table;
173         String query = "select * from "+syncTable+" where key='"+localFullyQualifiedKey+"';";
174         PreparedQueryObject readQueryObject = new PreparedQueryObject();
175         readQueryObject.appendQueryString(query);
176         ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
177         if (results.all().size() != 0) {
178             logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
179             try {
180                 syncQuorum(keyspace, table, primaryKeyValue);
181             } catch (Exception e) {
182                 StringWriter sw = new StringWriter();
183                     logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",
184                         ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
185                 String exceptionAsString = sw.toString();
186                 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
187             }
188             String cleanQuery = "delete from " + syncTable + " where key='"+localFullyQualifiedKey+"';";
189             PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
190             deleteQueryObject.appendQueryString(cleanQuery);
191             MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
192         }
193
194         getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
195
196         return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
197     }
198
199
200
201     /**
202      *
203      * @param tableQueryObject
204      * @param consistency
205      * @return Boolean Indicates success or failure
206      * @throws MusicServiceException
207      *
208      *
209      */
210     public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject,
211             String consistency) throws MusicServiceException {
212         boolean result = false;
213
214         try {
215             // create shadow locking table
216             result = getLockingServiceHandle().createLockQueue(keyspace, table);
217             if (result == false)
218                 return ResultType.FAILURE;
219
220             result = false;
221
222             // create table to track unsynced_keys
223             table = "unsyncedKeys_" + table;
224
225             String tabQuery =
226                     "CREATE TABLE IF NOT EXISTS " + keyspace + "." + table + " ( key text,PRIMARY KEY (key) );";
227             PreparedQueryObject queryObject = new PreparedQueryObject();
228
229             queryObject.appendQueryString(tabQuery);
230             result = false;
231             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
232
233             // create actual table
234             result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
235         } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
236             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR, ErrorSeverity.WARN,
237                     ErrorTypes.MUSICSERVICEERROR);
238             throw new MusicServiceException(ex.getMessage());
239         }
240         return result ? ResultType.SUCCESS : ResultType.FAILURE;
241     }
242
243     private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
244         logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
245         PreparedQueryObject selectQuery = new PreparedQueryObject();
246         PreparedQueryObject updateQuery = new PreparedQueryObject();
247
248         // get the primary key d
249         TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
250         String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName(); // we only support single
251                                                                             // primary key
252         DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
253         Object cqlFormattedPrimaryKeyValue =
254                         MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
255
256         // get the row of data from a quorum
257         selectQuery.appendQueryString("SELECT *  FROM " + keyspace + "." + table + " WHERE "
258                         + primaryKeyName + "= ?" + ";");
259         selectQuery.addValue(cqlFormattedPrimaryKeyValue);
260         MusicUtil.writeBackToQuorum(selectQuery, primaryKeyName, updateQuery, keyspace, table,
261             cqlFormattedPrimaryKeyValue);
262     }
263
264     /**
265      *
266      * @param query
267      * @return ResultSet
268      */
269     public ResultSet quorumGet(PreparedQueryObject query) {
270         ResultSet results = null;
271         try {
272             results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
273         } catch (MusicServiceException | MusicQueryException e) {
274             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR,
275                 ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR, e);
276
277         }
278         return results;
279     }
280
281     public String whoseTurnIsIt(String fullyQualifiedKey) {
282         String[] splitString = fullyQualifiedKey.split("\\.");
283         String keyspace = splitString[0];
284         String table = splitString[1];
285         String primaryKeyValue = splitString[2];
286         try {
287             LockObject lockOwner = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
288             if (!lockOwner.getIsLockOwner()) {
289                 return null;
290             }
291             return "$" + fullyQualifiedKey + "$" + lockOwner.getLockRef();
292         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
293             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.LOCKINGERROR + fullyQualifiedKey,
294                     ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
295         }
296         return null;
297     }
298     
299     public List<String> getCurrentLockHolders(String fullyQualifiedKey) {
300         String[] splitString = fullyQualifiedKey.split("\\.");
301         String keyspace = splitString[0];
302         String table = splitString[1];
303         String primaryKeyValue = splitString[2];
304         try {
305                 return getLockingServiceHandle().getCurrentLockHolders(keyspace, table, primaryKeyValue);
306         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
307             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
308         }
309         return null;
310     }
311
312     /**
313      *
314      * @param lockReference
315      * @return
316      */
317     public static String getLockNameFromId(String lockReference) {
318         StringTokenizer st = new StringTokenizer(lockReference);
319         return st.nextToken("$");
320     }
321
322     @Override
323     public void destroyLockRef(String lockId) throws MusicLockingException {
324         long start = System.currentTimeMillis();
325         String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
326         String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
327         String[] splitString = fullyQualifiedKey.split("\\.");
328         String keyspace = splitString[0];
329         String table = splitString[1];
330         String primaryKeyValue = splitString[2];
331         try {
332             getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef,MusicUtil.getRetryCount());
333         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
334             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef,
335                 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
336             throw new MusicLockingException(e.getMessage());
337         }
338         long end = System.currentTimeMillis();
339         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
340     }
341
342     public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
343         long start = System.currentTimeMillis();
344         String[] splitString = fullyQualifiedKey.split("\\.");
345         String keyspace = splitString[0];
346         String table = splitString[1];
347         String primaryKeyValue = splitString[2];
348         try {
349             getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference,MusicUtil.getRetryCount());
350         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
351             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK + lockReference,
352                 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR,e);
353             throw new MusicLockingException(e.getMessage());
354         }
355         long end = System.currentTimeMillis();
356         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
357         return new MusicLockState(LockStatus.UNLOCKED, "");
358     }
359
360     @Override
361     public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) throws MusicLockingException {
362         String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
363         String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
364         if (voluntaryRelease) {
365             return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
366         } else {
367             return forciblyReleaseLock(fullyQualifiedKey, lockRef);
368         }
369     }
370
371     public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference)
372             throws MusicLockingException {
373         MusicLockState result = null;
374         try {
375             result = destroyLockRef(fullyQualifiedKey, lockReference);
376         } catch (Exception ex) {
377             logger.info(EELFLoggerDelegate.applicationLogger,
378                     "Exception in voluntaryReleaseLock() for " + fullyQualifiedKey + "ref: " + lockReference);
379             throw new MusicLockingException(ex.getMessage());
380         }
381         return result;
382     }
383
384     public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
385         String[] splitString = fullyQualifiedKey.split("\\.");
386         String keyspace = splitString[0];
387         String table = splitString[1];
388
389         //leave a signal that this key could potentially be unsynchronized
390         String syncTable = keyspace+".unsyncedKeys_"+table;
391         PreparedQueryObject queryObject = new PreparedQueryObject();
392         String values = "(?)";
393         queryObject.addValue(fullyQualifiedKey);
394         String insQuery = "insert into "+syncTable+" (key) values "+values+";";
395         queryObject.appendQueryString(insQuery);
396         try {
397             MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
398         } catch (Exception e) {
399             logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
400                         + e.getMessage(), e);
401         }
402
403         //now release the lock
404         return destroyLockRef(fullyQualifiedKey, lockReference);
405     }
406
407     /**
408      *
409      * @param lockName
410      * @throws MusicLockingException
411      */
412     @Deprecated
413     public  void deleteLock(String lockName) throws MusicLockingException {
414         throw new MusicLockingException("Depreciated Method Delete Lock");
415     }
416
417     // Prepared Query Additions.
418
419     /**
420      *
421      * @param queryObject
422      * @return ReturnType
423      * @throws MusicServiceException
424      */
425     public  ReturnType eventualPut(PreparedQueryObject queryObject) {
426         boolean result = false;
427         try {
428             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
429         } catch (MusicServiceException | MusicQueryException ex) {
430             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get Lock Handle "  ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
431             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + "  " + ex.getCause() + " " + ex);
432             return new ReturnType(ResultType.FAILURE, ex.getMessage());
433         }
434         if (result) {
435             return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
436         } else {
437             return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
438         }
439     }
440
441     /**
442      *
443      * @param queryObject
444      * @return ReturnType
445      * @throws MusicServiceException
446      */
447     public  ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
448         boolean result = false;
449         long guard = 0;
450         PreparedQueryObject getGaurd = new PreparedQueryObject();
451         getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
452         getGaurd.addValue(primaryKey);
453         try {
454             ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
455             Row row = getGaurdResult.one();
456             if (row != null) {
457                 guard = row.getLong("guard");
458                 long timeOfWrite = System.currentTimeMillis();
459                 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
460                 String query = queryObject.getQuery();
461                 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
462                     if (queryObject.getOperation().equalsIgnoreCase("delete"))
463                         query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
464                     else
465                         query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
466                 }
467                 queryObject.replaceQueryString(query);
468             }
469
470         } catch (MusicServiceException | MusicQueryException e) {
471             logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(), e);
472         }
473         try {
474             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
475         } catch (MusicServiceException | MusicQueryException ex) {
476             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(),"[ERR512E] Failed to get Lock Handle ",
477                 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
478             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + "  " + ex.getCause() + " ", ex);
479             return new ReturnType(ResultType.FAILURE, ex.getMessage());
480         }
481         if (result) {
482             return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
483         } else {
484             return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
485         }
486     }
487
488     /**
489      *
490      * @param keyspace
491      * @param table
492      * @param primaryKeyValue
493      * @param queryObject
494      * @param lockId
495      * @return
496      */
497     public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
498             PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
499         long start = System.currentTimeMillis();
500         try {
501             String keyLock = lockId.substring(lockId.lastIndexOf(".") + 1,lockId.lastIndexOf("$"));
502             if (lockId.contains(".") && !keyLock.equals(primaryKeyValue)) {
503                 return new ReturnType(ResultType.FAILURE,"Lock value '" + keyLock + "' and key value '"
504                 + primaryKeyValue + "' not match. Please check your values: " 
505                 + lockId + " .");
506             }
507             LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
508                     lockId.substring(lockId.lastIndexOf("$") + 1));
509
510             if ( lockObject == null ) {
511                 return new ReturnType(ResultType.FAILURE, lockId + " does not exist.");
512             } else if (!lockObject.getIsLockOwner()) {
513                 return new ReturnType(ResultType.FAILURE, lockId + " is not the lock holder");
514             } else if (lockObject.getLocktype() != LockType.WRITE) {
515                 return new ReturnType(ResultType.FAILURE,
516                         "Attempting to do write operation, but " + lockId + " is a read lock");
517             }
518
519             if (conditionInfo != null) {
520                 try {
521                     if (conditionInfo.testCondition() == false)
522                         return new ReturnType(ResultType.FAILURE, "Lock acquired but the condition is not true");
523                 } catch (Exception e) {
524                     logger.error(EELFLoggerDelegate.errorLogger, e);
525                     return new ReturnType(ResultType.FAILURE,
526                             "Exception thrown while checking the condition, check its sanctity:\n" + e.getMessage());
527                 }
528             }
529             String query = queryObject.getQuery();
530             long timeOfWrite = System.currentTimeMillis();
531             long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$") + 1));
532             long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
533             // TODO: use Statement instead of modifying query
534             if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
535                 if (queryObject.getOperation().equalsIgnoreCase("delete"))
536                     query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
537                 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
538                     query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
539                 else
540                     query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
541             }
542             queryObject.replaceQueryString(query);
543             MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
544             dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
545             long end = System.currentTimeMillis();
546             logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
547         } catch (MusicQueryException | MusicServiceException | MusicLockingException  e) {
548             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), e);
549             return new ReturnType(ResultType.FAILURE,
550                 "Exception thrown while doing the critical put: "
551                 + e.getMessage());
552         }
553         return new ReturnType(ResultType.SUCCESS, "Update performed");
554     }
555
556
557     /**
558      *
559      * @param queryObject
560      * @param consistency
561      * @return Boolean Indicates success or failure
562      * @throws MusicServiceException
563      *
564      *
565      */
566     public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException,MusicQueryException {
567         // this is mainly for some functions like keyspace creation etc which does not
568         // really need the bells and whistles of Music locking.
569         boolean result = false;
570 //        try {
571         result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
572 //        } catch (MusicQueryException | MusicServiceException ex) {
573             // logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
574             //     ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR, ex);
575 //            throw new MusicServiceException(ex.getMessage(),ex);
576 //        }
577         return result ? ResultType.SUCCESS : ResultType.FAILURE;
578     }
579
580     /**
581      * This method performs DDL operation on cassandra.
582      *
583      * @param queryObject query object containing prepared query and values
584      * @return ResultSet
585      * @throws MusicServiceException
586      */
587     public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
588         ResultSet results = null;
589         try {
590             results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
591         } catch (MusicQueryException | MusicServiceException e) {
592             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
593             throw new MusicServiceException(e.getMessage());
594         }
595         return results;
596     }
597
598     /**
599      * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
600      * is used to check if the resource is free.
601      *
602      * @param keyspace name of the keyspace
603      * @param table name of the table
604      * @param primaryKeyValue primary key value
605      * @param queryObject query object containing prepared query and values
606      * @param lockId lock ID to check if the resource is free to perform the operation.
607      * @return ResultSet
608      */
609     public ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
610                     PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
611         ResultSet results = null;
612         String keyLock = lockId.substring(lockId.lastIndexOf(".") + 1,lockId.lastIndexOf("$"));
613         try {
614             if (lockId.contains(".") && !keyLock.equals(primaryKeyValue)) {
615                 throw new MusicLockingException("Lock value '" + keyLock + "' and key value '"
616                 + primaryKeyValue + "' do not match. Please check your values: " 
617                 + lockId + " .");
618             }
619             LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
620                 lockId.substring(lockId.lastIndexOf("$") + 1));
621             if (null == lockObject) {
622                 throw new MusicLockingException("No Lock Object. Please check if lock name or key is correct." 
623                     + lockId + " .");
624             }
625             if ( !lockObject.getIsLockOwner()) {
626                 return null;// not top of the lock store q
627             }
628             results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
629         } catch ( MusicLockingException e ) {
630             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity
631                     .WARN, ErrorTypes.MUSICSERVICEERROR);
632                 throw new MusicServiceException(
633                     "Cannot perform critical get for key: " + primaryKeyValue + " : " + e.getMessage());
634         } catch (MusicQueryException | MusicServiceException e) {
635             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity
636                     .WARN, ErrorTypes.MUSICSERVICEERROR, e);
637                 throw new MusicServiceException(
638                     "Cannot perform critical get for key: " + primaryKeyValue + " : " + e.getMessage());    
639         }
640         return results;
641     }
642
643     /**
644      * This method performs DML operation on cassandra, when the lock of the dd is acquired.
645      *
646      * @param keyspaceName name of the keyspace
647      * @param tableName name of the table
648      * @param primaryKey primary key value
649      * @param queryObject query object containing prepared query and values
650      * @return ReturnType
651      * @throws MusicLockingException
652      * @throws MusicServiceException
653      * @throws MusicQueryException
654      */
655     public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
656             PreparedQueryObject queryObject, Condition conditionInfo)
657             throws MusicLockingException, MusicQueryException, MusicServiceException {
658         long start = System.currentTimeMillis();
659         String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
660         String lockId = createLockReference(fullyQualifiedKey, LockType.WRITE);
661         long lockCreationTime = System.currentTimeMillis();
662         ReturnType lockAcqResult = null;
663         logger.info(EELFLoggerDelegate.applicationLogger,
664                 "***Acquiring lock for atomicPut() query : " + queryObject.getQuery() + " : " + primaryKey);
665         logger.info(EELFLoggerDelegate.applicationLogger,
666                 "***Acquiring lock for atomicPut() values: " + queryObject.getValues().toString());
667         if (conditionInfo != null) {
668             logger.info(EELFLoggerDelegate.applicationLogger,
669                     "***Acquiring lock for atomicPut() conditions: " + conditionInfo.toString());
670         }
671         try {
672             lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
673         } catch (MusicLockingException ex) {
674             logger.error(EELFLoggerDelegate.errorLogger,
675                     "Exception while acquireLockWithLease() in atomic put for key: " + primaryKey);
676             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
677             throw new MusicServiceException(
678                     "Cannot perform atomic put for key: " + primaryKey + " : " + ex.getMessage());
679         }
680         long lockAcqTime = System.currentTimeMillis();
681
682         /*
683          * if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) { logger.info(EELFLoggerDelegate.
684          * applicationLogger,"unable to acquire lock, id " + lockId);
685          * voluntaryReleaseLock(fullyQualifiedKey,lockId); return lockAcqResult; }
686          */
687
688         logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
689         String lockRef = lockId.substring(lockId.lastIndexOf("$"));
690         ReturnType criticalPutResult = null;
691         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
692             criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, queryObject, lockRef, conditionInfo);
693             long criticalPutTime = System.currentTimeMillis();
694             long lockDeleteTime = System.currentTimeMillis();
695             String timingInfo = "|lock creation time:" + (lockCreationTime - start) + "|lock accquire time:"
696                     + (lockAcqTime - lockCreationTime) + "|critical put time:" + (criticalPutTime - lockAcqTime)
697                     + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
698             criticalPutResult.setTimingInfo(timingInfo);
699         } else {
700             logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
701             criticalPutResult = lockAcqResult;
702         }
703         try {
704             voluntaryReleaseLock(fullyQualifiedKey, lockId);
705         } catch (MusicLockingException ex) {
706             logger.info(EELFLoggerDelegate.applicationLogger,
707                     "Exception occured while deleting lock after atomic put for key: " + primaryKey);
708             criticalPutResult.setMessage(criticalPutResult.getMessage() + "Lock release failed");
709         }
710         return criticalPutResult;
711     }
712
713
714
715     /**
716      * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
717      *
718      * @param keyspaceName name of the keyspace
719      * @param tableName name of the table
720      * @param primaryKey primary key value
721      * @param queryObject query object containing prepared query and values
722      * @return ResultSet
723      * @throws MusicServiceException
724      * @throws MusicLockingException
725      * @throws MusicQueryException
726      */
727     public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
728             PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
729         String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
730         String lockId = createLockReference(fullyQualifiedKey, LockType.READ);
731         long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
732         ReturnType lockAcqResult = null;
733         ResultSet result = null;
734         logger.info(EELFLoggerDelegate.applicationLogger, "Acquiring lock for atomicGet() : " + queryObject.getQuery());
735         try {
736             lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
737         } catch (MusicLockingException ex) {
738             logger.error(EELFLoggerDelegate.errorLogger,
739                     "Exception while acquireLockWithLease() in atomic get for key: " + primaryKey);
740             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
741             throw new MusicServiceException(
742                     "Cannot perform atomic get for key: " + primaryKey + " : " + ex.getMessage());
743         }
744         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
745             logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
746             String lockRef = lockId.substring(lockId.lastIndexOf("$"));
747             result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
748         } else {
749             logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
750         }
751         try {
752             voluntaryReleaseLock(fullyQualifiedKey, lockId);
753         } catch (MusicLockingException ex) {
754             logger.info(EELFLoggerDelegate.applicationLogger,
755                     "Exception occured while deleting lock after atomic put for key: " + primaryKey);
756             throw new MusicLockingException(ex.getMessage());
757         }
758
759         return result;
760     }
761
762
763
764     /**
765      * @param lockName
766      * @return
767      */
768     public Map<String, Object> validateLock(String lockName) {
769         return MusicUtil.validateLock(lockName);
770     }
771
772     @Override
773     @Deprecated
774     public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
775         PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
776         return null;
777     }
778
779     @Override
780     public List<String> getLockQueue(String fullyQualifiedKey)
781         throws MusicServiceException, MusicQueryException, MusicLockingException {
782         String[] splitString = fullyQualifiedKey.split("\\.");
783         String keyspace = splitString[0];
784         String table = splitString[1];
785         String primaryKeyValue = splitString[2];
786
787         return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
788     }
789     @Override
790     public long getLockQueueSize(String fullyQualifiedKey)
791         throws MusicServiceException, MusicQueryException, MusicLockingException {
792         String[] splitString = fullyQualifiedKey.split("\\.");
793         String keyspace = splitString[0];
794         String table = splitString[1];
795         String primaryKeyValue = splitString[2];
796
797         return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
798     }
799
800     @Override
801     @Deprecated
802     public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
803             PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
804         //deprecated
805         return null;
806     }
807
808 }