Changes Listed below:
[music.git] / src / main / java / org / onap / music / service / impl / MusicCassaCore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  *  Modifications Copyright (c) 2018 IBM. 
7  * ===================================================================
8  *  Modifications Copyright (c) 2019 Samsung
9  * ===================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *
14  *     http://www.apache.org/licenses/LICENSE-2.0
15  *
16  *  Unless required by applicable law or agreed to in writing, software
17  *  distributed under the License is distributed on an "AS IS" BASIS,
18  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  *  See the License for the specific language governing permissions and
20  *  limitations under the License.
21  *
22  * ============LICENSE_END=============================================
23  * ====================================================================
24  */
25
26 package org.onap.music.service.impl;
27
28 import java.io.StringWriter;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.StringTokenizer;
32 import java.util.concurrent.TimeUnit;
33
34 import org.onap.music.datastore.MusicDataStore;
35 import org.onap.music.datastore.MusicDataStoreHandle;
36 import org.onap.music.datastore.PreparedQueryObject;
37 import org.onap.music.eelf.logging.EELFLoggerDelegate;
38 import org.onap.music.eelf.logging.format.AppMessages;
39 import org.onap.music.eelf.logging.format.ErrorSeverity;
40 import org.onap.music.eelf.logging.format.ErrorTypes;
41 import org.onap.music.exceptions.MusicLockingException;
42 import org.onap.music.exceptions.MusicQueryException;
43 import org.onap.music.exceptions.MusicServiceException;
44 import org.onap.music.lockingservice.cassandra.CassaLockStore;
45 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
46 import org.onap.music.lockingservice.cassandra.MusicLockState;
47 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
48 import org.onap.music.main.MusicUtil;
49 import org.onap.music.main.ResultType;
50 import org.onap.music.main.ReturnType;
51 import org.onap.music.service.MusicCoreService;
52
53 import com.att.eelf.configuration.EELFLogger;
54 import com.datastax.driver.core.DataType;
55 import com.datastax.driver.core.ResultSet;
56 import com.datastax.driver.core.Row;
57 import com.datastax.driver.core.TableMetadata;
58
59 import org.onap.music.datastore.*;
60
61 public class MusicCassaCore implements MusicCoreService {
62
63     public static CassaLockStore mLockHandle = null;;
64     private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
65     private static boolean unitTestRun=true;
66     private static MusicCassaCore musicCassaCoreInstance = null;
67
68     private MusicCassaCore() {
69
70     }
71     public static MusicCassaCore getInstance() {
72
73         if(musicCassaCoreInstance == null) {
74             musicCassaCoreInstance = new MusicCassaCore();
75         }
76         return musicCassaCoreInstance;
77     }
78
79     public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
80         logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
81         long start = System.currentTimeMillis();
82
83         if (mLockHandle == null) {
84             try {
85                 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
86             } catch (Exception e) {
87                 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
88                 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
89             }
90         }
91         long end = System.currentTimeMillis();
92         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
93         return mLockHandle;
94     }
95
96
97
98     public String createLockReference(String fullyQualifiedKey) throws MusicLockingException {
99         String[] splitString = fullyQualifiedKey.split("\\.");
100         String keyspace = splitString[0];
101         String table = splitString[1];
102         String lockName = splitString[2];
103
104         logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
105         long start = System.currentTimeMillis();
106         String lockReference = null;
107         
108         try {
109             lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName);
110         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
111             e.printStackTrace();
112             throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
113         } catch (Exception e) {
114             logger.error(EELFLoggerDelegate.applicationLogger, e);
115             throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
116         }
117         long end = System.currentTimeMillis();
118         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
119         return lockReference;
120     }
121
122
123     public  ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException  {
124         evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
125             return acquireLock(fullyQualifiedKey, lockReference);
126     }
127
128     private  void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
129
130         String[] splitString = fullyQualifiedKey.split("\\.");
131         String keyspace = splitString[0];
132         String table = splitString[1];
133         String primaryKeyValue = splitString[2];
134
135         LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
136         
137         if (currentLockHolderObject==null) { //no lock holder
138             return;
139         }
140         /* Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the 
141          * reference*/
142         long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.acquireTime), Long.parseLong(currentLockHolderObject.createTime));
143         if((System.currentTimeMillis() - referenceTime) > leasePeriod) {
144             forciblyReleaseLock(fullyQualifiedKey,  currentLockHolderObject.lockRef+"");
145             logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.lockRef+" forcibly released");
146         }        
147     }
148     
149     private static ReturnType isTopOfLockStore(String keyspace, String table, String primaryKeyValue, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException {
150         //return failure to lock holders too early or already evicted from the lock store
151         LockObject topOfLockStore = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
152         if (topOfLockStore==null) {
153             logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not the lock holder yet");
154             return new ReturnType(ResultType.FAILURE, "No lock holder!");
155         }
156         
157         String topOfLockStoreS = topOfLockStore.lockRef;
158         long topOfLockStoreL = Long.parseLong(topOfLockStoreS);
159         long lockReferenceL = Long.parseLong(lockReference);
160
161         if(lockReferenceL > topOfLockStoreL) {
162             logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not the lock holder yet");
163             return new ReturnType(ResultType.FAILURE, lockReference+" is not the lock holder yet");
164         }
165
166         if(lockReferenceL < topOfLockStoreL) {
167             logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is no longer/or was never in the lock store queue");
168             return new ReturnType(ResultType.FAILURE, lockReference+" is no longer/or was never in the lock store queue");
169         }
170
171         return new ReturnType(ResultType.SUCCESS, lockReference+" is top of lock store");
172     }
173
174     public  ReturnType acquireLock(String fullyQualifiedKey, String lockId)
175             throws MusicLockingException, MusicQueryException, MusicServiceException {
176         String[] splitString = lockId.split("\\.");
177         String keyspace = splitString[0].substring(1);//remove '$'
178         String table = splitString[1];
179         String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
180         String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
181         String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
182
183         ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockRef);
184
185         if(result.getResult().equals(ResultType.FAILURE))
186                 return result;//not top of the lock store q
187
188         //check to see if the value of the key has to be synced in case there was a forceful release
189         String syncTable = keyspace+".unsyncedKeys_"+table;
190         String query = "select * from "+syncTable+" where key='"+localFullyQualifiedKey+"';";
191         PreparedQueryObject readQueryObject = new PreparedQueryObject();
192         readQueryObject.appendQueryString(query);
193         ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
194         if (results.all().size() != 0) {
195             logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
196             try {
197                 syncQuorum(keyspace, table, primaryKeyValue);
198             } catch (Exception e) {
199                 StringWriter sw = new StringWriter();
200                     logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",
201                         ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
202                 String exceptionAsString = sw.toString();
203                 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
204             }
205             String cleanQuery = "delete from " + syncTable + " where key='"+localFullyQualifiedKey+"';";
206             PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
207             deleteQueryObject.appendQueryString(cleanQuery);
208             MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
209         }
210
211         getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
212
213         return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
214     }
215
216
217
218     /**
219      *
220      * @param tableQueryObject
221      * @param consistency
222      * @return Boolean Indicates success or failure
223      * @throws MusicServiceException
224      *
225      *
226      */
227     public  ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject, String consistency) throws MusicServiceException {
228             boolean result = false;
229
230             try {
231                 //create shadow locking table
232                 result = getLockingServiceHandle().createLockQueue(keyspace, table);
233                 if(result == false)
234                     return ResultType.FAILURE;
235
236                 result = false;
237
238                 //create table to track unsynced_keys
239                 table = "unsyncedKeys_"+table;
240
241                 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
242                         + " ( key text,PRIMARY KEY (key) );";
243                 PreparedQueryObject queryObject = new PreparedQueryObject();
244
245                 queryObject.appendQueryString(tabQuery);
246                 result = false;
247                 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
248
249
250                 //create actual table
251                 result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
252             } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
253                 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity
254                     .WARN, ErrorTypes.MUSICSERVICEERROR, ex);
255                 throw new MusicServiceException(ex.getMessage());
256             }
257             return result?ResultType.SUCCESS:ResultType.FAILURE;
258     }
259
260     private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
261         logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
262         PreparedQueryObject selectQuery = new PreparedQueryObject();
263         PreparedQueryObject updateQuery = new PreparedQueryObject();
264
265         // get the primary key d
266         TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
267         String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName(); // we only support single
268                                                                             // primary key
269         DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
270         Object cqlFormattedPrimaryKeyValue =
271                         MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
272
273         // get the row of data from a quorum
274         selectQuery.appendQueryString("SELECT *  FROM " + keyspace + "." + table + " WHERE "
275                         + primaryKeyName + "= ?" + ";");
276         selectQuery.addValue(cqlFormattedPrimaryKeyValue);
277         MusicUtil.writeBackToQuorum(selectQuery, primaryKeyName, updateQuery, keyspace, table,
278             cqlFormattedPrimaryKeyValue);
279
280     }
281
282     /**
283      *
284      * @param query
285      * @return ResultSet
286      */
287     public  ResultSet quorumGet(PreparedQueryObject query) {
288         ResultSet results = null;
289         try {
290             results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
291         } catch (MusicServiceException | MusicQueryException e) {
292             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity
293                 .MAJOR, ErrorTypes.GENERALSERVICEERROR, e);
294
295         }
296         return results;
297
298     }
299
300     /**
301      *
302      * @param fullyQualifiedKey lockName
303      * @return
304      */
305     public  String whoseTurnIsIt(String fullyQualifiedKey) {
306         String[] splitString = fullyQualifiedKey.split("\\.");
307         String keyspace = splitString[0];
308         String table = splitString[1];
309         String primaryKeyValue = splitString[2];
310         try {
311             LockObject lockOwner = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
312             if (lockOwner==null) {
313                 return "No lock holder!";
314             }
315             return "$" + fullyQualifiedKey + "$" 
316                     + getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
317         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
318              logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey
319                  ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
320         }
321         return null;
322     }
323
324     /**
325      *
326      * @param lockReference
327      * @return
328      */
329     public static String getLockNameFromId(String lockReference) {
330         StringTokenizer st = new StringTokenizer(lockReference);
331         return st.nextToken("$");
332     }
333
334     @Override
335     public void destroyLockRef(String lockId) throws MusicLockingException {
336         long start = System.currentTimeMillis();
337         String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
338         String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
339         String[] splitString = fullyQualifiedKey.split("\\.");
340         String keyspace = splitString[0];
341         String table = splitString[1];
342         String primaryKeyValue = splitString[2];
343         try {
344             getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef,MusicUtil.getRetryCount());
345         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
346             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef,
347                 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
348             throw new MusicLockingException(e.getMessage());
349         }
350         long end = System.currentTimeMillis();
351         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
352     }
353
354     public  MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
355         long start = System.currentTimeMillis();
356         String[] splitString = fullyQualifiedKey.split("\\.");
357         String keyspace = splitString[0];
358         String table = splitString[1];
359         String primaryKeyValue = splitString[2];
360         try {
361             getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference,MusicUtil.getRetryCount());
362         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
363             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK + lockReference,
364                 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR,e);
365             throw new MusicLockingException(e.getMessage());
366         }
367         long end = System.currentTimeMillis();
368         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
369         return new MusicLockState(LockStatus.UNLOCKED, "");
370     }
371
372     @Override
373     public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) throws MusicLockingException {
374         String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
375         String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
376         if (voluntaryRelease) {
377             return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
378         } else {
379             return forciblyReleaseLock(fullyQualifiedKey, lockRef);
380         }
381     }
382
383     public   MusicLockState  voluntaryReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
384         MusicLockState result = null;
385         try {
386            result = destroyLockRef(fullyQualifiedKey, lockReference);
387         }
388         catch(Exception ex) {
389               logger.info(EELFLoggerDelegate.applicationLogger,"Exception in voluntaryReleaseLock() for "+ fullyQualifiedKey + "ref: " + lockReference);
390               throw new MusicLockingException(ex.getMessage());
391             }
392         return result;
393     }
394
395     public  MusicLockState  forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
396         String[] splitString = fullyQualifiedKey.split("\\.");
397         String keyspace = splitString[0];
398         String table = splitString[1];
399
400             //leave a signal that this key could potentially be unsynchronized
401         String syncTable = keyspace+".unsyncedKeys_"+table;
402         PreparedQueryObject queryObject = new PreparedQueryObject();
403         String values = "(?)";
404         queryObject.addValue(fullyQualifiedKey);
405         String insQuery = "insert into "+syncTable+" (key) values "+values+";";
406         queryObject.appendQueryString(insQuery);
407         try {
408             MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
409         } catch (Exception e) {
410             logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
411                         + e.getMessage(), e);
412         }
413
414         //now release the lock
415         return destroyLockRef(fullyQualifiedKey, lockReference);
416     }
417
418     /**
419      *
420      * @param lockName
421      * @throws MusicLockingException
422      */
423     @Deprecated
424     public  void deleteLock(String lockName) throws MusicLockingException {
425         throw new MusicLockingException("Depreciated Method Delete Lock");
426     }
427
428     // Prepared Query Additions.
429
430     /**
431      *
432      * @param queryObject
433      * @return ReturnType
434      * @throws MusicServiceException
435      */
436     public  ReturnType eventualPut(PreparedQueryObject queryObject) {
437         boolean result = false;
438         try {
439             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
440         } catch (MusicServiceException | MusicQueryException ex) {
441             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get Lock Handle "  ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
442             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + "  " + ex.getCause() + " " + ex);
443             return new ReturnType(ResultType.FAILURE, ex.getMessage());
444         }
445         if (result) {
446             return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
447         } else {
448             return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
449         }
450     }
451
452     /**
453      *
454      * @param queryObject
455      * @return ReturnType
456      * @throws MusicServiceException
457      */
458     public  ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
459         boolean result = false;
460         long guard = 0;
461         PreparedQueryObject getGaurd = new PreparedQueryObject();
462         getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
463         getGaurd.addValue(primaryKey);
464         try {
465             ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
466             Row row = getGaurdResult.one();
467             if (row != null) {
468                 guard = row.getLong("guard");
469                 long timeOfWrite = System.currentTimeMillis();
470                 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
471                 String query = queryObject.getQuery();
472                 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
473                     if (queryObject.getOperation().equalsIgnoreCase("delete"))
474                         query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
475                     else
476                         query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
477                 }
478                 queryObject.replaceQueryString(query);
479             }
480
481         } catch (MusicServiceException | MusicQueryException e) {
482             logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(), e);
483         }
484         try {
485             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
486         } catch (MusicServiceException | MusicQueryException ex) {
487             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(),"[ERR512E] Failed to get Lock Handle ",
488                 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
489             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + "  " + ex.getCause() + " ", ex);
490             return new ReturnType(ResultType.FAILURE, ex.getMessage());
491         }
492         if (result) {
493             return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
494         } else {
495             return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
496         }
497     }
498
499     /**
500      *
501      * @param keyspace
502      * @param table
503      * @param primaryKeyValue
504      * @param queryObject
505      * @param lockId
506      * @return
507      */
508     public  ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
509                     PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
510         long start = System.currentTimeMillis();
511         try {
512         ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
513                 lockId.substring(lockId.lastIndexOf("$")+1));
514         if(result.getResult().equals(ResultType.FAILURE))
515                 return result;//not top of the lock store q
516
517         if (conditionInfo != null)
518             try {
519                 if (conditionInfo.testCondition() == false)
520                     return new ReturnType(ResultType.FAILURE,
521                         "Lock acquired but the condition is not true");
522             } catch (Exception e) {
523                 logger.error(EELFLoggerDelegate.errorLogger, e);
524                 return new ReturnType(ResultType.FAILURE,
525                     "Exception thrown while checking the condition, check its sanctity:\n"
526                     + e.getMessage());
527             }
528             String query = queryObject.getQuery();
529             long timeOfWrite = System.currentTimeMillis();
530             long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$")+1));
531             long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
532             // TODO: use Statement instead of modifying query
533             if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
534                 if (queryObject.getOperation().equalsIgnoreCase("delete"))
535                     query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
536                 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
537                     query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
538                 else
539                     query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
540             }
541             queryObject.replaceQueryString(query);
542             MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
543             dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
544             long end = System.currentTimeMillis();
545             logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
546         }catch (MusicQueryException | MusicServiceException | MusicLockingException  e) {
547             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), e);
548             return new ReturnType(ResultType.FAILURE,
549                 "Exception thrown while doing the critical put\n"
550                 + e.getMessage());
551         }
552         return new ReturnType(ResultType.SUCCESS, "Update performed");
553     }
554
555
556     /**
557      *
558      * @param queryObject
559      * @param consistency
560      * @return Boolean Indicates success or failure
561      * @throws MusicServiceException
562      *
563      *
564      */
565     public  ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException {
566         // this is mainly for some functions like keyspace creation etc which does not
567         // really need the bells and whistles of Music locking.
568         boolean result = false;
569         try {
570             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
571         } catch (MusicQueryException | MusicServiceException ex) {
572             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
573                     ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR, ex);
574             throw new MusicServiceException(ex.getMessage());
575         }
576         return result ? ResultType.SUCCESS : ResultType.FAILURE;
577     }
578
579     /**
580      * This method performs DDL operation on cassandra.
581      *
582      * @param queryObject query object containing prepared query and values
583      * @return ResultSet
584      * @throws MusicServiceException
585      */
586     public  ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
587         ResultSet results = null;
588         try {
589             results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
590         } catch (MusicQueryException | MusicServiceException e) {
591             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), e);
592             throw new MusicServiceException(e.getMessage());
593         }
594         return results;
595     }
596
597     /**
598      * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
599      * is used to check if the resource is free.
600      *
601      * @param keyspace name of the keyspace
602      * @param table name of the table
603      * @param primaryKeyValue primary key value
604      * @param queryObject query object containing prepared query and values
605      * @param lockId lock ID to check if the resource is free to perform the operation.
606      * @return ResultSet
607      */
608     public  ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
609                     PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
610         ResultSet results = null;
611
612         try {
613             ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
614                     lockId.substring(lockId.lastIndexOf("$")+1));
615             if(result.getResult().equals(ResultType.FAILURE))
616                     return null;//not top of the lock store q
617                 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
618         } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
619                 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity
620                     .WARN, ErrorTypes.MUSICSERVICEERROR, e);
621         }
622         return results;
623     }
624
625     /**
626      * This method performs DML operation on cassandra, when the lock of the dd is acquired.
627      *
628      * @param keyspaceName name of the keyspace
629      * @param tableName name of the table
630      * @param primaryKey primary key value
631      * @param queryObject query object containing prepared query and values
632      * @return ReturnType
633      * @throws MusicLockingException
634      * @throws MusicServiceException
635      * @throws MusicQueryException
636      */
637     public  ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
638                     PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException, MusicQueryException, MusicServiceException {
639         long start = System.currentTimeMillis();
640         String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
641         String lockId = createLockReference(fullyQualifiedKey);
642         long lockCreationTime = System.currentTimeMillis();
643         ReturnType lockAcqResult = null;
644         logger.info(EELFLoggerDelegate.applicationLogger,"***Acquiring lock for atomicPut() query : " + queryObject.getQuery() + " : "+ primaryKey);
645         logger.info(EELFLoggerDelegate.applicationLogger,"***Acquiring lock for atomicPut() values: " + queryObject.getValues().toString());
646         if(conditionInfo!=null)
647         logger.info(EELFLoggerDelegate.applicationLogger,"***Acquiring lock for atomicPut() conditions: " + conditionInfo.toString());
648         try {      
649         lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId,MusicUtil.getDefaultLockLeasePeriod());
650         }catch(MusicLockingException ex) {
651             logger.error(EELFLoggerDelegate.errorLogger,"Exception while acquireLockWithLease() in atomic put for key: "+primaryKey);
652             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage());
653             throw new MusicServiceException("Cannot perform atomic put for key: "+primaryKey+ " : "+ ex.getMessage());
654         }
655         long lockAcqTime = System.currentTimeMillis();
656
657         /*
658          * if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
659          * logger.info(EELFLoggerDelegate.
660          * applicationLogger,"unable to acquire lock, id " + lockId);
661          * voluntaryReleaseLock(fullyQualifiedKey,lockId); return lockAcqResult; }
662          */
663
664         logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
665         String lockRef = lockId.substring(lockId.lastIndexOf("$"));
666         ReturnType criticalPutResult = null ;
667             if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
668                 criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, queryObject, lockRef,
669                         conditionInfo);
670                 long criticalPutTime = System.currentTimeMillis();
671                 long lockDeleteTime = System.currentTimeMillis();
672                 String timingInfo = "|lock creation time:" + (lockCreationTime - start) + "|lock accquire time:"
673                         + (lockAcqTime - lockCreationTime) + "|critical put time:" + (criticalPutTime - lockAcqTime)
674                         + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
675                 criticalPutResult.setTimingInfo(timingInfo);
676             }else {
677                 logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
678                 criticalPutResult = lockAcqResult;
679             }
680             try {
681             voluntaryReleaseLock(fullyQualifiedKey, lockId);
682             
683         } catch (MusicLockingException ex) {
684             logger.info(EELFLoggerDelegate.applicationLogger,"Exception occured while deleting lock after atomic put for key: "+primaryKey);
685             criticalPutResult.setMessage(criticalPutResult.getMessage()+ "Lock release failed");
686         } 
687         return criticalPutResult;
688     }
689
690
691
692     /**
693      * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
694      *
695      * @param keyspaceName name of the keyspace
696      * @param tableName name of the table
697      * @param primaryKey primary key value
698      * @param queryObject query object containing prepared query and values
699      * @return ResultSet
700      * @throws MusicServiceException
701      * @throws MusicLockingException
702      * @throws MusicQueryException
703      */
704     public  ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
705                     PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
706         String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
707         String lockId = createLockReference(fullyQualifiedKey);
708         long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
709         ReturnType lockAcqResult = null;
710         ResultSet result =null;
711         logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock for atomicGet() : " + queryObject.getQuery());
712         try {
713         lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId,MusicUtil.getDefaultLockLeasePeriod());
714         }catch(MusicLockingException ex) {
715             logger.error(EELFLoggerDelegate.errorLogger,"Exception while acquireLockWithLease() in atomic get for key: "+primaryKey);
716             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage());
717             throw new MusicServiceException("Cannot perform atomic get for key: "+primaryKey+ " : "+ ex.getMessage());
718         }
719         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
720             logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
721             String lockRef = lockId.substring(lockId.lastIndexOf("$"));
722             result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
723         } else {
724             logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
725             
726         }
727         try {
728             voluntaryReleaseLock(fullyQualifiedKey,lockId);
729         }catch(MusicLockingException ex) {
730             logger.info(EELFLoggerDelegate.applicationLogger,"Exception occured while deleting lock after atomic put for key: "+primaryKey);
731             throw new MusicLockingException(ex.getMessage());
732         }
733         
734         return result;
735     }
736
737
738
739     /**
740      * @param lockName
741      * @return
742      */
743     public Map<String, Object> validateLock(String lockName) {
744         return MusicUtil.validateLock(lockName);
745     }
746
747     @Override
748     @Deprecated
749     public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
750         PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
751         return null;
752     }
753
754     @Override
755     public List<String> getLockQueue(String fullyQualifiedKey)
756         throws MusicServiceException, MusicQueryException, MusicLockingException {
757         String[] splitString = fullyQualifiedKey.split("\\.");
758         String keyspace = splitString[0];
759         String table = splitString[1];
760         String primaryKeyValue = splitString[2];
761
762         return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
763     }
764     @Override
765     public long getLockQueueSize(String fullyQualifiedKey)
766         throws MusicServiceException, MusicQueryException, MusicLockingException {
767         String[] splitString = fullyQualifiedKey.split("\\.");
768         String keyspace = splitString[0];
769         String table = splitString[1];
770         String primaryKeyValue = splitString[2];
771
772         return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
773     }
774
775     @Override
776     @Deprecated
777     public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
778             PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
779         //deprecated
780         return null;
781     }
782
783 }