07864576cec09e70d418532794645b26f97bc718
[music.git] / src / main / java / org / onap / music / service / impl / MusicCassaCore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  *  Modifications Copyright (c) 2018 IBM. 
7  * ===================================================================
8  *  Modifications Copyright (c) 2019 Samsung
9  * ===================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *
14  *     http://www.apache.org/licenses/LICENSE-2.0
15  *
16  *  Unless required by applicable law or agreed to in writing, software
17  *  distributed under the License is distributed on an "AS IS" BASIS,
18  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  *  See the License for the specific language governing permissions and
20  *  limitations under the License.
21  *
22  * ============LICENSE_END=============================================
23  * ====================================================================
24  */
25
26 package org.onap.music.service.impl;
27
28 import java.io.StringWriter;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.StringTokenizer;
32
33 import org.onap.music.datastore.MusicDataStore;
34 import org.onap.music.datastore.MusicDataStoreHandle;
35 import org.onap.music.datastore.PreparedQueryObject;
36 import org.onap.music.eelf.logging.EELFLoggerDelegate;
37 import org.onap.music.eelf.logging.format.AppMessages;
38 import org.onap.music.eelf.logging.format.ErrorSeverity;
39 import org.onap.music.eelf.logging.format.ErrorTypes;
40 import org.onap.music.exceptions.MusicLockingException;
41 import org.onap.music.exceptions.MusicQueryException;
42 import org.onap.music.exceptions.MusicServiceException;
43 import org.onap.music.lockingservice.cassandra.CassaLockStore;
44 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
45 import org.onap.music.lockingservice.cassandra.LockType;
46 import org.onap.music.lockingservice.cassandra.MusicLockState;
47 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
48 import org.onap.music.main.MusicUtil;
49 import org.onap.music.main.ResultType;
50 import org.onap.music.main.ReturnType;
51 import org.onap.music.service.MusicCoreService;
52
53 import com.datastax.driver.core.DataType;
54 import com.datastax.driver.core.ResultSet;
55 import com.datastax.driver.core.Row;
56 import com.datastax.driver.core.TableMetadata;
57
58 import org.onap.music.datastore.*;
59
60 public class MusicCassaCore implements MusicCoreService {
61
62     private static CassaLockStore mLockHandle = null;
63     private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
64     private static MusicCassaCore musicCassaCoreInstance = null;
65
66     private MusicCassaCore() {
67         // not going to happen
68     }
69     
70     public static CassaLockStore getmLockHandle() {
71         return mLockHandle;
72     }
73
74     public static void setmLockHandle(CassaLockStore mLockHandle) {
75         MusicCassaCore.mLockHandle = mLockHandle;
76     }
77     
78     public static MusicCassaCore getInstance() {
79
80         if(musicCassaCoreInstance == null) {
81             musicCassaCoreInstance = new MusicCassaCore();
82         }
83         return musicCassaCoreInstance;
84     }
85
86
87
88
89     public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
90         logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
91         long start = System.currentTimeMillis();
92
93         if (mLockHandle == null) {
94             try {
95                 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
96             } catch (Exception e) {
97                 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
98                 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
99             }
100         }
101         long end = System.currentTimeMillis();
102         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
103         return mLockHandle;
104     }
105
106     public String createLockReference(String fullyQualifiedKey) throws MusicLockingException {
107         return createLockReference(fullyQualifiedKey, LockType.WRITE);
108     }
109
110     public String createLockReference(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
111         String[] splitString = fullyQualifiedKey.split("\\.");
112         String keyspace = splitString[0];
113         String table = splitString[1];
114         String lockName = splitString[2];
115
116         logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
117         long start = System.currentTimeMillis();
118         String lockReference = null;
119         
120         try {
121             lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype);
122         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
123             e.printStackTrace();
124             throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
125         } catch (Exception e) {
126             logger.error(EELFLoggerDelegate.applicationLogger, e);
127             throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
128         }
129         long end = System.currentTimeMillis();
130         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
131         return lockReference;
132     }
133
134
135     public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod)
136             throws MusicLockingException, MusicQueryException, MusicServiceException  {
137         evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
138         return acquireLock(fullyQualifiedKey, lockReference);
139     }
140
141     private void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod)
142             throws MusicLockingException, MusicQueryException, MusicServiceException {
143         String[] splitString = fullyQualifiedKey.split("\\.");
144         String keyspace = splitString[0];
145         String table = splitString[1];
146         String primaryKeyValue = splitString[2];
147
148         LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
149
150         if (!currentLockHolderObject.getIsLockOwner()) { // no lock holder
151             return;
152         }
153         /*
154          * Release the lock of the previous holder if it has expired. if the update to the acquire time has
155          * not reached due to network delays, simply use the create time as the reference
156          */
157         long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.getAcquireTime()),
158                 Long.parseLong(currentLockHolderObject.getCreateTime()));
159         if ((System.currentTimeMillis() - referenceTime) > leasePeriod) {
160             forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.getLockRef() + "");
161             logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.getLockRef() + " forcibly released");
162         }
163     }
164
165     public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
166             throws MusicLockingException, MusicQueryException, MusicServiceException {
167         String[] splitString = lockId.split("\\.");
168         String keyspace = splitString[0].substring(1);//remove '$'
169         String table = splitString[1];
170         String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
171         String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
172         String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
173
174         LockObject lockInfo = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue, lockRef);
175
176         if (!lockInfo.getIsLockOwner()) {
177             return new ReturnType(ResultType.FAILURE, lockId + " is not a lock holder");//not top of the lock store q
178         }
179         
180         //check to see if the value of the key has to be synced in case there was a forceful release
181         String syncTable = keyspace+".unsyncedKeys_"+table;
182         String query = "select * from "+syncTable+" where key='"+localFullyQualifiedKey+"';";
183         PreparedQueryObject readQueryObject = new PreparedQueryObject();
184         readQueryObject.appendQueryString(query);
185         ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
186         if (results.all().size() != 0) {
187             logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
188             try {
189                 syncQuorum(keyspace, table, primaryKeyValue);
190             } catch (Exception e) {
191                 StringWriter sw = new StringWriter();
192                     logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",
193                         ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
194                 String exceptionAsString = sw.toString();
195                 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
196             }
197             String cleanQuery = "delete from " + syncTable + " where key='"+localFullyQualifiedKey+"';";
198             PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
199             deleteQueryObject.appendQueryString(cleanQuery);
200             MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
201         }
202
203         getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
204
205         return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
206     }
207
208
209
210     /**
211      *
212      * @param tableQueryObject
213      * @param consistency
214      * @return Boolean Indicates success or failure
215      * @throws MusicServiceException
216      *
217      *
218      */
219     public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject,
220             String consistency) throws MusicServiceException {
221         boolean result = false;
222
223         try {
224             // create shadow locking table
225             result = getLockingServiceHandle().createLockQueue(keyspace, table);
226             if (result == false)
227                 return ResultType.FAILURE;
228
229             result = false;
230
231             // create table to track unsynced_keys
232             table = "unsyncedKeys_" + table;
233
234             String tabQuery =
235                     "CREATE TABLE IF NOT EXISTS " + keyspace + "." + table + " ( key text,PRIMARY KEY (key) );";
236             PreparedQueryObject queryObject = new PreparedQueryObject();
237
238             queryObject.appendQueryString(tabQuery);
239             result = false;
240             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
241
242             // create actual table
243             result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
244         } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
245             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR, ErrorSeverity.WARN,
246                     ErrorTypes.MUSICSERVICEERROR);
247             throw new MusicServiceException(ex.getMessage());
248         }
249         return result ? ResultType.SUCCESS : ResultType.FAILURE;
250     }
251
252     private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
253         logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
254         PreparedQueryObject selectQuery = new PreparedQueryObject();
255         PreparedQueryObject updateQuery = new PreparedQueryObject();
256
257         // get the primary key d
258         TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
259         String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName(); // we only support single
260                                                                             // primary key
261         DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
262         Object cqlFormattedPrimaryKeyValue =
263                         MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
264
265         // get the row of data from a quorum
266         selectQuery.appendQueryString("SELECT *  FROM " + keyspace + "." + table + " WHERE "
267                         + primaryKeyName + "= ?" + ";");
268         selectQuery.addValue(cqlFormattedPrimaryKeyValue);
269         MusicUtil.writeBackToQuorum(selectQuery, primaryKeyName, updateQuery, keyspace, table,
270             cqlFormattedPrimaryKeyValue);
271     }
272
273     /**
274      *
275      * @param query
276      * @return ResultSet
277      */
278     public ResultSet quorumGet(PreparedQueryObject query) {
279         ResultSet results = null;
280         try {
281             results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
282         } catch (MusicServiceException | MusicQueryException e) {
283             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR,
284                 ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR, e);
285
286         }
287         return results;
288     }
289
290     public String whoseTurnIsIt(String fullyQualifiedKey) {
291         String[] splitString = fullyQualifiedKey.split("\\.");
292         String keyspace = splitString[0];
293         String table = splitString[1];
294         String primaryKeyValue = splitString[2];
295         try {
296             LockObject lockOwner = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
297             if (!lockOwner.getIsLockOwner()) {
298                 return null;
299             }
300             return "$" + fullyQualifiedKey + "$" + lockOwner.getLockRef();
301         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
302             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.LOCKINGERROR + fullyQualifiedKey,
303                     ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
304         }
305         return null;
306     }
307     
308     public List<String> getCurrentLockHolders(String fullyQualifiedKey) {
309         String[] splitString = fullyQualifiedKey.split("\\.");
310         String keyspace = splitString[0];
311         String table = splitString[1];
312         String primaryKeyValue = splitString[2];
313         try {
314                 return getLockingServiceHandle().getCurrentLockHolders(keyspace, table, primaryKeyValue);
315         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
316             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
317         }
318         return null;
319     }
320
321     /**
322      *
323      * @param lockReference
324      * @return
325      */
326     public static String getLockNameFromId(String lockReference) {
327         StringTokenizer st = new StringTokenizer(lockReference);
328         return st.nextToken("$");
329     }
330
331     @Override
332     public void destroyLockRef(String lockId) throws MusicLockingException {
333         long start = System.currentTimeMillis();
334         String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
335         String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
336         String[] splitString = fullyQualifiedKey.split("\\.");
337         String keyspace = splitString[0];
338         String table = splitString[1];
339         String primaryKeyValue = splitString[2];
340         try {
341             getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef,MusicUtil.getRetryCount());
342         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
343             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef,
344                 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
345             throw new MusicLockingException(e.getMessage());
346         }
347         long end = System.currentTimeMillis();
348         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
349     }
350
351     public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
352         long start = System.currentTimeMillis();
353         String[] splitString = fullyQualifiedKey.split("\\.");
354         String keyspace = splitString[0];
355         String table = splitString[1];
356         String primaryKeyValue = splitString[2];
357         try {
358             getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference,MusicUtil.getRetryCount());
359         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
360             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK + lockReference,
361                 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR,e);
362             throw new MusicLockingException(e.getMessage());
363         }
364         long end = System.currentTimeMillis();
365         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
366         return new MusicLockState(LockStatus.UNLOCKED, "");
367     }
368
369     @Override
370     public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) throws MusicLockingException {
371         String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
372         String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
373         if (voluntaryRelease) {
374             return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
375         } else {
376             return forciblyReleaseLock(fullyQualifiedKey, lockRef);
377         }
378     }
379
380     public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference)
381             throws MusicLockingException {
382         MusicLockState result = null;
383         try {
384             result = destroyLockRef(fullyQualifiedKey, lockReference);
385         } catch (Exception ex) {
386             logger.info(EELFLoggerDelegate.applicationLogger,
387                     "Exception in voluntaryReleaseLock() for " + fullyQualifiedKey + "ref: " + lockReference);
388             throw new MusicLockingException(ex.getMessage());
389         }
390         return result;
391     }
392
393     public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
394         String[] splitString = fullyQualifiedKey.split("\\.");
395         String keyspace = splitString[0];
396         String table = splitString[1];
397
398         //leave a signal that this key could potentially be unsynchronized
399         String syncTable = keyspace+".unsyncedKeys_"+table;
400         PreparedQueryObject queryObject = new PreparedQueryObject();
401         String values = "(?)";
402         queryObject.addValue(fullyQualifiedKey);
403         String insQuery = "insert into "+syncTable+" (key) values "+values+";";
404         queryObject.appendQueryString(insQuery);
405         try {
406             MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
407         } catch (Exception e) {
408             logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
409                         + e.getMessage(), e);
410         }
411
412         //now release the lock
413         return destroyLockRef(fullyQualifiedKey, lockReference);
414     }
415
416     /**
417      *
418      * @param lockName
419      * @throws MusicLockingException
420      */
421     @Deprecated
422     public  void deleteLock(String lockName) throws MusicLockingException {
423         throw new MusicLockingException("Depreciated Method Delete Lock");
424     }
425
426     // Prepared Query Additions.
427
428     /**
429      *
430      * @param queryObject
431      * @return ReturnType
432      * @throws MusicServiceException
433      */
434     public  ReturnType eventualPut(PreparedQueryObject queryObject) {
435         boolean result = false;
436         try {
437             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
438         } catch (MusicServiceException | MusicQueryException ex) {
439             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get Lock Handle "  ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
440             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + "  " + ex.getCause() + " " + ex);
441             return new ReturnType(ResultType.FAILURE, ex.getMessage());
442         }
443         if (result) {
444             return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
445         } else {
446             return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
447         }
448     }
449
450     /**
451      *
452      * @param queryObject
453      * @return ReturnType
454      * @throws MusicServiceException
455      */
456     public  ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
457         boolean result = false;
458         long guard = 0;
459         PreparedQueryObject getGaurd = new PreparedQueryObject();
460         getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
461         getGaurd.addValue(primaryKey);
462         try {
463             ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
464             Row row = getGaurdResult.one();
465             if (row != null) {
466                 guard = row.getLong("guard");
467                 long timeOfWrite = System.currentTimeMillis();
468                 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
469                 String query = queryObject.getQuery();
470                 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
471                     if (queryObject.getOperation().equalsIgnoreCase("delete"))
472                         query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
473                     else
474                         query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
475                 }
476                 queryObject.replaceQueryString(query);
477             }
478
479         } catch (MusicServiceException | MusicQueryException e) {
480             logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(), e);
481         }
482         try {
483             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
484         } catch (MusicServiceException | MusicQueryException ex) {
485             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(),"[ERR512E] Failed to get Lock Handle ",
486                 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
487             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + "  " + ex.getCause() + " ", ex);
488             return new ReturnType(ResultType.FAILURE, ex.getMessage());
489         }
490         if (result) {
491             return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
492         } else {
493             return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
494         }
495     }
496
497     /**
498      *
499      * @param keyspace
500      * @param table
501      * @param primaryKeyValue
502      * @param queryObject
503      * @param lockId
504      * @return
505      */
506     public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
507             PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
508         long start = System.currentTimeMillis();
509         try {
510             String keyLock = lockId.substring(lockId.lastIndexOf(".") + 1,lockId.lastIndexOf("$"));
511             if (lockId.contains(".") && !keyLock.equals(primaryKeyValue)) {
512                 return new ReturnType(ResultType.FAILURE,"Lock value '" + keyLock + "' and key value '"
513                 + primaryKeyValue + "' not match. Please check your values: " 
514                 + lockId + " .");
515             }
516             LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
517                     lockId.substring(lockId.lastIndexOf("$") + 1));
518
519             if ( lockObject == null ) {
520                 return new ReturnType(ResultType.FAILURE, lockId + " does not exist.");
521             } else if (!lockObject.getIsLockOwner()) {
522                 return new ReturnType(ResultType.FAILURE, lockId + " is not the lock holder");
523             } else if (lockObject.getLocktype() != LockType.WRITE) {
524                 return new ReturnType(ResultType.FAILURE,
525                         "Attempting to do write operation, but " + lockId + " is a read lock");
526             }
527
528             if (conditionInfo != null) {
529                 try {
530                     if (conditionInfo.testCondition() == false)
531                         return new ReturnType(ResultType.FAILURE, "Lock acquired but the condition is not true");
532                 } catch (Exception e) {
533                     logger.error(EELFLoggerDelegate.errorLogger, e);
534                     return new ReturnType(ResultType.FAILURE,
535                             "Exception thrown while checking the condition, check its sanctity:\n" + e.getMessage());
536                 }
537             }
538             String query = queryObject.getQuery();
539             long timeOfWrite = System.currentTimeMillis();
540             long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$") + 1));
541             long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
542             // TODO: use Statement instead of modifying query
543             if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
544                 if (queryObject.getOperation().equalsIgnoreCase("delete"))
545                     query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
546                 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
547                     query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
548                 else
549                     query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
550             }
551             queryObject.replaceQueryString(query);
552             MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
553             dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
554             long end = System.currentTimeMillis();
555             logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
556         } catch (MusicQueryException | MusicServiceException | MusicLockingException  e) {
557             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), e);
558             return new ReturnType(ResultType.FAILURE,
559                 "Exception thrown while doing the critical put: "
560                 + e.getMessage());
561         }
562         return new ReturnType(ResultType.SUCCESS, "Update performed");
563     }
564
565
566     /**
567      *
568      * @param queryObject
569      * @param consistency
570      * @return Boolean Indicates success or failure
571      * @throws MusicServiceException
572      *
573      *
574      */
575     public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException,MusicQueryException {
576         // this is mainly for some functions like keyspace creation etc which does not
577         // really need the bells and whistles of Music locking.
578         boolean result = false;
579 //        try {
580         result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
581 //        } catch (MusicQueryException | MusicServiceException ex) {
582             // logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
583             //     ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR, ex);
584 //            throw new MusicServiceException(ex.getMessage(),ex);
585 //        }
586         return result ? ResultType.SUCCESS : ResultType.FAILURE;
587     }
588
589     /**
590      * This method performs DDL operation on cassandra.
591      *
592      * @param queryObject query object containing prepared query and values
593      * @return ResultSet
594      * @throws MusicServiceException
595      */
596     public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
597         ResultSet results = null;
598         try {
599             results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
600         } catch (MusicQueryException | MusicServiceException e) {
601             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
602             throw new MusicServiceException(e.getMessage());
603         }
604         return results;
605     }
606
607     /**
608      * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
609      * is used to check if the resource is free.
610      *
611      * @param keyspace name of the keyspace
612      * @param table name of the table
613      * @param primaryKeyValue primary key value
614      * @param queryObject query object containing prepared query and values
615      * @param lockId lock ID to check if the resource is free to perform the operation.
616      * @return ResultSet
617      */
618     public ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
619                     PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
620         ResultSet results = null;
621         String keyLock = lockId.substring(lockId.lastIndexOf(".") + 1,lockId.lastIndexOf("$"));
622         try {
623             if (lockId.contains(".") && !keyLock.equals(primaryKeyValue)) {
624                 throw new MusicLockingException("Lock value '" + keyLock + "' and key value '"
625                 + primaryKeyValue + "' do not match. Please check your values: " 
626                 + lockId + " .");
627             }
628             LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
629                 lockId.substring(lockId.lastIndexOf("$") + 1));
630             if (null == lockObject) {
631                 throw new MusicLockingException("No Lock Object. Please check if lock name or key is correct." 
632                     + lockId + " .");
633             }
634             if ( !lockObject.getIsLockOwner()) {
635                 return null;// not top of the lock store q
636             }
637             results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
638         } catch ( MusicLockingException e ) {
639             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity
640                     .WARN, ErrorTypes.MUSICSERVICEERROR);
641                 throw new MusicServiceException(
642                     "Cannot perform critical get for key: " + primaryKeyValue + " : " + e.getMessage());
643         } catch (MusicQueryException | MusicServiceException e) {
644             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity
645                     .WARN, ErrorTypes.MUSICSERVICEERROR, e);
646                 throw new MusicServiceException(
647                     "Cannot perform critical get for key: " + primaryKeyValue + " : " + e.getMessage());    
648         }
649         return results;
650     }
651
652     /**
653      * This method performs DML operation on cassandra, when the lock of the dd is acquired.
654      *
655      * @param keyspaceName name of the keyspace
656      * @param tableName name of the table
657      * @param primaryKey primary key value
658      * @param queryObject query object containing prepared query and values
659      * @return ReturnType
660      * @throws MusicLockingException
661      * @throws MusicServiceException
662      * @throws MusicQueryException
663      */
664     public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
665             PreparedQueryObject queryObject, Condition conditionInfo)
666             throws MusicLockingException, MusicQueryException, MusicServiceException {
667         long start = System.currentTimeMillis();
668         String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
669         String lockId = createLockReference(fullyQualifiedKey, LockType.WRITE);
670         long lockCreationTime = System.currentTimeMillis();
671         ReturnType lockAcqResult = null;
672         logger.info(EELFLoggerDelegate.applicationLogger,
673                 "***Acquiring lock for atomicPut() query : " + queryObject.getQuery() + " : " + primaryKey);
674         logger.info(EELFLoggerDelegate.applicationLogger,
675                 "***Acquiring lock for atomicPut() values: " + queryObject.getValues().toString());
676         if (conditionInfo != null) {
677             logger.info(EELFLoggerDelegate.applicationLogger,
678                     "***Acquiring lock for atomicPut() conditions: " + conditionInfo.toString());
679         }
680         try {
681             lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
682         } catch (MusicLockingException ex) {
683             logger.error(EELFLoggerDelegate.errorLogger,
684                     "Exception while acquireLockWithLease() in atomic put for key: " + primaryKey);
685             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
686             throw new MusicServiceException(
687                     "Cannot perform atomic put for key: " + primaryKey + " : " + ex.getMessage());
688         }
689         long lockAcqTime = System.currentTimeMillis();
690
691         /*
692          * if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) { logger.info(EELFLoggerDelegate.
693          * applicationLogger,"unable to acquire lock, id " + lockId);
694          * voluntaryReleaseLock(fullyQualifiedKey,lockId); return lockAcqResult; }
695          */
696
697         logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
698         String lockRef = lockId.substring(lockId.lastIndexOf("$"));
699         ReturnType criticalPutResult = null;
700         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
701             criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, queryObject, lockRef, conditionInfo);
702             long criticalPutTime = System.currentTimeMillis();
703             long lockDeleteTime = System.currentTimeMillis();
704             String timingInfo = "|lock creation time:" + (lockCreationTime - start) + "|lock accquire time:"
705                     + (lockAcqTime - lockCreationTime) + "|critical put time:" + (criticalPutTime - lockAcqTime)
706                     + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
707             criticalPutResult.setTimingInfo(timingInfo);
708         } else {
709             logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
710             criticalPutResult = lockAcqResult;
711         }
712         try {
713             voluntaryReleaseLock(fullyQualifiedKey, lockId);
714         } catch (MusicLockingException ex) {
715             logger.info(EELFLoggerDelegate.applicationLogger,
716                     "Exception occured while deleting lock after atomic put for key: " + primaryKey);
717             criticalPutResult.setMessage(criticalPutResult.getMessage() + "Lock release failed");
718         }
719         return criticalPutResult;
720     }
721
722
723
724     /**
725      * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
726      *
727      * @param keyspaceName name of the keyspace
728      * @param tableName name of the table
729      * @param primaryKey primary key value
730      * @param queryObject query object containing prepared query and values
731      * @return ResultSet
732      * @throws MusicServiceException
733      * @throws MusicLockingException
734      * @throws MusicQueryException
735      */
736     public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
737             PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
738         String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
739         String lockId = createLockReference(fullyQualifiedKey, LockType.READ);
740         long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
741         ReturnType lockAcqResult = null;
742         ResultSet result = null;
743         logger.info(EELFLoggerDelegate.applicationLogger, "Acquiring lock for atomicGet() : " + queryObject.getQuery());
744         try {
745             lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
746         } catch (MusicLockingException ex) {
747             logger.error(EELFLoggerDelegate.errorLogger,
748                     "Exception while acquireLockWithLease() in atomic get for key: " + primaryKey);
749             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
750             throw new MusicServiceException(
751                     "Cannot perform atomic get for key: " + primaryKey + " : " + ex.getMessage());
752         }
753         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
754             logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
755             String lockRef = lockId.substring(lockId.lastIndexOf("$"));
756             result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
757         } else {
758             logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
759         }
760         try {
761             voluntaryReleaseLock(fullyQualifiedKey, lockId);
762         } catch (MusicLockingException ex) {
763             logger.info(EELFLoggerDelegate.applicationLogger,
764                     "Exception occured while deleting lock after atomic put for key: " + primaryKey);
765             throw new MusicLockingException(ex.getMessage());
766         }
767
768         return result;
769     }
770
771
772
773     /**
774      * @param lockName
775      * @return
776      */
777     public Map<String, Object> validateLock(String lockName) {
778         return MusicUtil.validateLock(lockName);
779     }
780
781     @Override
782     @Deprecated
783     public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
784         PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
785         return null;
786     }
787
788     @Override
789     public List<String> getLockQueue(String fullyQualifiedKey)
790         throws MusicServiceException, MusicQueryException, MusicLockingException {
791         String[] splitString = fullyQualifiedKey.split("\\.");
792         String keyspace = splitString[0];
793         String table = splitString[1];
794         String primaryKeyValue = splitString[2];
795
796         return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
797     }
798     @Override
799     public long getLockQueueSize(String fullyQualifiedKey)
800         throws MusicServiceException, MusicQueryException, MusicLockingException {
801         String[] splitString = fullyQualifiedKey.split("\\.");
802         String keyspace = splitString[0];
803         String table = splitString[1];
804         String primaryKeyValue = splitString[2];
805
806         return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
807     }
808
809     @Override
810     @Deprecated
811     public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
812             PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
813         //deprecated
814         return null;
815     }
816
817
818 }