CADI and a few small updates.
[music.git] / src / main / java / org / onap / music / service / impl / MusicCassaCore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  *  Modifications Copyright (c) 2018 IBM. 
7  * ===================================================================
8  *  Modifications Copyright (c) 2019 Samsung
9  * ===================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *
14  *     http://www.apache.org/licenses/LICENSE-2.0
15  *
16  *  Unless required by applicable law or agreed to in writing, software
17  *  distributed under the License is distributed on an "AS IS" BASIS,
18  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  *  See the License for the specific language governing permissions and
20  *  limitations under the License.
21  *
22  * ============LICENSE_END=============================================
23  * ====================================================================
24  */
25
26 package org.onap.music.service.impl;
27
28 import java.io.StringWriter;
29 import java.util.List;
30 import java.util.Map;
31 import java.util.StringTokenizer;
32 import java.util.concurrent.TimeUnit;
33
34 import org.onap.music.datastore.MusicDataStore;
35 import org.onap.music.datastore.MusicDataStoreHandle;
36 import org.onap.music.datastore.PreparedQueryObject;
37 import org.onap.music.eelf.logging.EELFLoggerDelegate;
38 import org.onap.music.eelf.logging.format.AppMessages;
39 import org.onap.music.eelf.logging.format.ErrorSeverity;
40 import org.onap.music.eelf.logging.format.ErrorTypes;
41 import org.onap.music.exceptions.MusicLockingException;
42 import org.onap.music.exceptions.MusicQueryException;
43 import org.onap.music.exceptions.MusicServiceException;
44 import org.onap.music.lockingservice.cassandra.CassaLockStore;
45 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
46 import org.onap.music.lockingservice.cassandra.LockType;
47 import org.onap.music.lockingservice.cassandra.MusicLockState;
48 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
49 import org.onap.music.main.MusicUtil;
50 import org.onap.music.main.ResultType;
51 import org.onap.music.main.ReturnType;
52 import org.onap.music.service.MusicCoreService;
53
54 import com.att.eelf.configuration.EELFLogger;
55 import com.datastax.driver.core.DataType;
56 import com.datastax.driver.core.ResultSet;
57 import com.datastax.driver.core.Row;
58 import com.datastax.driver.core.TableMetadata;
59
60 import org.onap.music.datastore.*;
61
62 public class MusicCassaCore implements MusicCoreService {
63
64     public static CassaLockStore mLockHandle = null;;
65     private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
66     private static boolean unitTestRun=true;
67     private static MusicCassaCore musicCassaCoreInstance = null;
68
69     private MusicCassaCore() {
70
71     }
72     public static MusicCassaCore getInstance() {
73
74         if(musicCassaCoreInstance == null) {
75             musicCassaCoreInstance = new MusicCassaCore();
76         }
77         return musicCassaCoreInstance;
78     }
79
80     public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
81         logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
82         long start = System.currentTimeMillis();
83
84         if (mLockHandle == null) {
85             try {
86                 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
87             } catch (Exception e) {
88                 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
89                 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
90             }
91         }
92         long end = System.currentTimeMillis();
93         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
94         return mLockHandle;
95     }
96
97     public String createLockReference(String fullyQualifiedKey) throws MusicLockingException {
98         return createLockReference(fullyQualifiedKey, LockType.WRITE);
99     }
100
101     public String createLockReference(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
102         String[] splitString = fullyQualifiedKey.split("\\.");
103         String keyspace = splitString[0];
104         String table = splitString[1];
105         String lockName = splitString[2];
106
107         logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
108         long start = System.currentTimeMillis();
109         String lockReference = null;
110         
111         try {
112             lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype);
113         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
114             e.printStackTrace();
115             throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
116         } catch (Exception e) {
117             logger.error(EELFLoggerDelegate.applicationLogger, e);
118             throw new MusicLockingException("Unable to create lock reference. " + e.getMessage());
119         }
120         long end = System.currentTimeMillis();
121         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
122         return lockReference;
123     }
124
125
126     public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod)
127             throws MusicLockingException, MusicQueryException, MusicServiceException  {
128          evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
129          return acquireLock(fullyQualifiedKey, lockReference);
130     }
131
132     private void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod)
133             throws MusicLockingException, MusicQueryException, MusicServiceException {
134         String[] splitString = fullyQualifiedKey.split("\\.");
135         String keyspace = splitString[0];
136         String table = splitString[1];
137         String primaryKeyValue = splitString[2];
138
139         LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
140
141         if (!currentLockHolderObject.getIsLockOwner()) { // no lock holder
142             return;
143         }
144         /*
145          * Release the lock of the previous holder if it has expired. if the update to the acquire time has
146          * not reached due to network delays, simply use the create time as the reference
147          */
148         long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.getAcquireTime()),
149                 Long.parseLong(currentLockHolderObject.getCreateTime()));
150         if ((System.currentTimeMillis() - referenceTime) > leasePeriod) {
151             forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.getLockRef() + "");
152             logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.getLockRef() + " forcibly released");
153         }
154     }
155
156     public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
157             throws MusicLockingException, MusicQueryException, MusicServiceException {
158         String[] splitString = lockId.split("\\.");
159         String keyspace = splitString[0].substring(1);//remove '$'
160         String table = splitString[1];
161         String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
162         String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
163         String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
164
165         LockObject lockInfo = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue, lockRef);
166
167         if (!lockInfo.getIsLockOwner()) {
168             return new ReturnType(ResultType.FAILURE, lockId + " is not a lock holder");//not top of the lock store q
169         }
170    
171         //check to see if the value of the key has to be synced in case there was a forceful release
172         String syncTable = keyspace+".unsyncedKeys_"+table;
173         String query = "select * from "+syncTable+" where key='"+localFullyQualifiedKey+"';";
174         PreparedQueryObject readQueryObject = new PreparedQueryObject();
175         readQueryObject.appendQueryString(query);
176         ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
177         if (results.all().size() != 0) {
178             logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
179             try {
180                 syncQuorum(keyspace, table, primaryKeyValue);
181             } catch (Exception e) {
182                 StringWriter sw = new StringWriter();
183                     logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",
184                         ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
185                 String exceptionAsString = sw.toString();
186                 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
187             }
188             String cleanQuery = "delete from " + syncTable + " where key='"+localFullyQualifiedKey+"';";
189             PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
190             deleteQueryObject.appendQueryString(cleanQuery);
191             MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
192         }
193
194         getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
195
196         return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
197     }
198
199
200
201     /**
202      *
203      * @param tableQueryObject
204      * @param consistency
205      * @return Boolean Indicates success or failure
206      * @throws MusicServiceException
207      *
208      *
209      */
210     public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject,
211             String consistency) throws MusicServiceException {
212         boolean result = false;
213
214         try {
215             // create shadow locking table
216             result = getLockingServiceHandle().createLockQueue(keyspace, table);
217             if (result == false)
218                 return ResultType.FAILURE;
219
220             result = false;
221
222             // create table to track unsynced_keys
223             table = "unsyncedKeys_" + table;
224
225             String tabQuery =
226                     "CREATE TABLE IF NOT EXISTS " + keyspace + "." + table + " ( key text,PRIMARY KEY (key) );";
227             PreparedQueryObject queryObject = new PreparedQueryObject();
228
229             queryObject.appendQueryString(tabQuery);
230             result = false;
231             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
232
233             // create actual table
234             result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
235         } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
236             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR, ErrorSeverity.WARN,
237                     ErrorTypes.MUSICSERVICEERROR);
238             throw new MusicServiceException(ex.getMessage());
239         }
240         return result ? ResultType.SUCCESS : ResultType.FAILURE;
241     }
242
243     private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
244         logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
245         PreparedQueryObject selectQuery = new PreparedQueryObject();
246         PreparedQueryObject updateQuery = new PreparedQueryObject();
247
248         // get the primary key d
249         TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
250         String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName(); // we only support single
251                                                                             // primary key
252         DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
253         Object cqlFormattedPrimaryKeyValue =
254                         MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
255
256         // get the row of data from a quorum
257         selectQuery.appendQueryString("SELECT *  FROM " + keyspace + "." + table + " WHERE "
258                         + primaryKeyName + "= ?" + ";");
259         selectQuery.addValue(cqlFormattedPrimaryKeyValue);
260         MusicUtil.writeBackToQuorum(selectQuery, primaryKeyName, updateQuery, keyspace, table,
261             cqlFormattedPrimaryKeyValue);
262     }
263
264     /**
265      *
266      * @param query
267      * @return ResultSet
268      */
269     public ResultSet quorumGet(PreparedQueryObject query) {
270         ResultSet results = null;
271         try {
272             results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
273         } catch (MusicServiceException | MusicQueryException e) {
274             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR,
275                 ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR, e);
276
277         }
278         return results;
279     }
280
281     public String whoseTurnIsIt(String fullyQualifiedKey) {
282         String[] splitString = fullyQualifiedKey.split("\\.");
283         String keyspace = splitString[0];
284         String table = splitString[1];
285         String primaryKeyValue = splitString[2];
286         try {
287             LockObject lockOwner = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
288             if (!lockOwner.getIsLockOwner()) {
289                 return "No lock holder!";
290             }
291             return "$" + fullyQualifiedKey + "$" + lockOwner.getLockRef();
292         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
293             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.LOCKINGERROR + fullyQualifiedKey,
294                     ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
295         }
296         return null;
297     }
298     
299     public List<String> getCurrentLockHolders(String fullyQualifiedKey) {
300         String[] splitString = fullyQualifiedKey.split("\\.");
301         String keyspace = splitString[0];
302         String table = splitString[1];
303         String primaryKeyValue = splitString[2];
304         try {
305                 return getLockingServiceHandle().getCurrentLockHolders(keyspace, table, primaryKeyValue);
306         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
307             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
308         }
309         return null;
310     }
311
312     /**
313      *
314      * @param lockReference
315      * @return
316      */
317     public static String getLockNameFromId(String lockReference) {
318         StringTokenizer st = new StringTokenizer(lockReference);
319         return st.nextToken("$");
320     }
321
322     @Override
323     public void destroyLockRef(String lockId) throws MusicLockingException {
324         long start = System.currentTimeMillis();
325         String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
326         String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
327         String[] splitString = fullyQualifiedKey.split("\\.");
328         String keyspace = splitString[0];
329         String table = splitString[1];
330         String primaryKeyValue = splitString[2];
331         try {
332             getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef,MusicUtil.getRetryCount());
333         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
334             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef,
335                 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
336             throw new MusicLockingException(e.getMessage());
337         }
338         long end = System.currentTimeMillis();
339         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
340     }
341
342     public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
343         long start = System.currentTimeMillis();
344         String[] splitString = fullyQualifiedKey.split("\\.");
345         String keyspace = splitString[0];
346         String table = splitString[1];
347         String primaryKeyValue = splitString[2];
348         try {
349             getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference,MusicUtil.getRetryCount());
350         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
351             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK + lockReference,
352                 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR,e);
353             throw new MusicLockingException(e.getMessage());
354         }
355         long end = System.currentTimeMillis();
356         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
357         return new MusicLockState(LockStatus.UNLOCKED, "");
358     }
359
360     @Override
361     public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) throws MusicLockingException {
362         String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
363         String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
364         if (voluntaryRelease) {
365             return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
366         } else {
367             return forciblyReleaseLock(fullyQualifiedKey, lockRef);
368         }
369     }
370
371     public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference)
372             throws MusicLockingException {
373         MusicLockState result = null;
374         try {
375             result = destroyLockRef(fullyQualifiedKey, lockReference);
376         } catch (Exception ex) {
377             logger.info(EELFLoggerDelegate.applicationLogger,
378                     "Exception in voluntaryReleaseLock() for " + fullyQualifiedKey + "ref: " + lockReference);
379             throw new MusicLockingException(ex.getMessage());
380         }
381         return result;
382     }
383
384     public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
385         String[] splitString = fullyQualifiedKey.split("\\.");
386         String keyspace = splitString[0];
387         String table = splitString[1];
388
389         //leave a signal that this key could potentially be unsynchronized
390         String syncTable = keyspace+".unsyncedKeys_"+table;
391         PreparedQueryObject queryObject = new PreparedQueryObject();
392         String values = "(?)";
393         queryObject.addValue(fullyQualifiedKey);
394         String insQuery = "insert into "+syncTable+" (key) values "+values+";";
395         queryObject.appendQueryString(insQuery);
396         try {
397             MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
398         } catch (Exception e) {
399             logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
400                         + e.getMessage(), e);
401         }
402
403         //now release the lock
404         return destroyLockRef(fullyQualifiedKey, lockReference);
405     }
406
407     /**
408      *
409      * @param lockName
410      * @throws MusicLockingException
411      */
412     @Deprecated
413     public  void deleteLock(String lockName) throws MusicLockingException {
414         throw new MusicLockingException("Depreciated Method Delete Lock");
415     }
416
417     // Prepared Query Additions.
418
419     /**
420      *
421      * @param queryObject
422      * @return ReturnType
423      * @throws MusicServiceException
424      */
425     public  ReturnType eventualPut(PreparedQueryObject queryObject) {
426         boolean result = false;
427         try {
428             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
429         } catch (MusicServiceException | MusicQueryException ex) {
430             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get Lock Handle "  ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
431             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + "  " + ex.getCause() + " " + ex);
432             return new ReturnType(ResultType.FAILURE, ex.getMessage());
433         }
434         if (result) {
435             return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
436         } else {
437             return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
438         }
439     }
440
441     /**
442      *
443      * @param queryObject
444      * @return ReturnType
445      * @throws MusicServiceException
446      */
447     public  ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
448         boolean result = false;
449         long guard = 0;
450         PreparedQueryObject getGaurd = new PreparedQueryObject();
451         getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
452         getGaurd.addValue(primaryKey);
453         try {
454             ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
455             Row row = getGaurdResult.one();
456             if (row != null) {
457                 guard = row.getLong("guard");
458                 long timeOfWrite = System.currentTimeMillis();
459                 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
460                 String query = queryObject.getQuery();
461                 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
462                     if (queryObject.getOperation().equalsIgnoreCase("delete"))
463                         query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
464                     else
465                         query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
466                 }
467                 queryObject.replaceQueryString(query);
468             }
469
470         } catch (MusicServiceException | MusicQueryException e) {
471             logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(), e);
472         }
473         try {
474             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
475         } catch (MusicServiceException | MusicQueryException ex) {
476             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(),"[ERR512E] Failed to get Lock Handle ",
477                 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
478             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + "  " + ex.getCause() + " ", ex);
479             return new ReturnType(ResultType.FAILURE, ex.getMessage());
480         }
481         if (result) {
482             return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
483         } else {
484             return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
485         }
486     }
487
488     /**
489      *
490      * @param keyspace
491      * @param table
492      * @param primaryKeyValue
493      * @param queryObject
494      * @param lockId
495      * @return
496      */
497     public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
498             PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
499         long start = System.currentTimeMillis();
500         try {
501             LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
502                     lockId.substring(lockId.lastIndexOf("$") + 1));
503
504             if (!lockObject.getIsLockOwner()) {
505                 return new ReturnType(ResultType.FAILURE, lockId + " is not the lock holder");
506             } else if (lockObject.getLocktype() != LockType.WRITE) {
507                 return new ReturnType(ResultType.FAILURE,
508                         "Attempting to do write operation, but " + lockId + " is a write lock");
509             }
510
511             if (conditionInfo != null) {
512                 try {
513                     if (conditionInfo.testCondition() == false)
514                         return new ReturnType(ResultType.FAILURE, "Lock acquired but the condition is not true");
515                 } catch (Exception e) {
516                     logger.error(EELFLoggerDelegate.errorLogger, e);
517                     return new ReturnType(ResultType.FAILURE,
518                             "Exception thrown while checking the condition, check its sanctity:\n" + e.getMessage());
519                 }
520             }
521             String query = queryObject.getQuery();
522             long timeOfWrite = System.currentTimeMillis();
523             long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$") + 1));
524             long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
525             // TODO: use Statement instead of modifying query
526             if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
527                 if (queryObject.getOperation().equalsIgnoreCase("delete"))
528                     query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
529                 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
530                     query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
531                 else
532                     query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
533             }
534             queryObject.replaceQueryString(query);
535             MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
536             dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
537             long end = System.currentTimeMillis();
538             logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
539         }catch (MusicQueryException | MusicServiceException | MusicLockingException  e) {
540             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), e);
541             return new ReturnType(ResultType.FAILURE,
542                 "Exception thrown while doing the critical put\n"
543                 + e.getMessage());
544         }
545         return new ReturnType(ResultType.SUCCESS, "Update performed");
546     }
547
548
549     /**
550      *
551      * @param queryObject
552      * @param consistency
553      * @return Boolean Indicates success or failure
554      * @throws MusicServiceException
555      *
556      *
557      */
558     public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException {
559         // this is mainly for some functions like keyspace creation etc which does not
560         // really need the bells and whistles of Music locking.
561         boolean result = false;
562         try {
563             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
564         } catch (MusicQueryException | MusicServiceException ex) {
565             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
566                     ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR, ex);
567             throw new MusicServiceException(ex.getMessage());
568         }
569         return result ? ResultType.SUCCESS : ResultType.FAILURE;
570     }
571
572     /**
573      * This method performs DDL operation on cassandra.
574      *
575      * @param queryObject query object containing prepared query and values
576      * @return ResultSet
577      * @throws MusicServiceException
578      */
579     public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
580         ResultSet results = null;
581         try {
582             results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
583         } catch (MusicQueryException | MusicServiceException e) {
584             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
585             throw new MusicServiceException(e.getMessage());
586         }
587         return results;
588     }
589
590     /**
591      * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
592      * is used to check if the resource is free.
593      *
594      * @param keyspace name of the keyspace
595      * @param table name of the table
596      * @param primaryKeyValue primary key value
597      * @param queryObject query object containing prepared query and values
598      * @param lockId lock ID to check if the resource is free to perform the operation.
599      * @return ResultSet
600      */
601     public ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
602                     PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
603         ResultSet results = null;
604
605         try {
606             LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
607                     lockId.substring(lockId.lastIndexOf("$") + 1));
608             if (!lockObject.getIsLockOwner()) {
609                 return null;// not top of the lock store q
610             }
611             results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
612         } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
613                 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity
614                     .WARN, ErrorTypes.MUSICSERVICEERROR, e);
615         }
616         return results;
617     }
618
619     /**
620      * This method performs DML operation on cassandra, when the lock of the dd is acquired.
621      *
622      * @param keyspaceName name of the keyspace
623      * @param tableName name of the table
624      * @param primaryKey primary key value
625      * @param queryObject query object containing prepared query and values
626      * @return ReturnType
627      * @throws MusicLockingException
628      * @throws MusicServiceException
629      * @throws MusicQueryException
630      */
631     public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
632             PreparedQueryObject queryObject, Condition conditionInfo)
633             throws MusicLockingException, MusicQueryException, MusicServiceException {
634         long start = System.currentTimeMillis();
635         String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
636         String lockId = createLockReference(fullyQualifiedKey, LockType.WRITE);
637         long lockCreationTime = System.currentTimeMillis();
638         ReturnType lockAcqResult = null;
639         logger.info(EELFLoggerDelegate.applicationLogger,
640                 "***Acquiring lock for atomicPut() query : " + queryObject.getQuery() + " : " + primaryKey);
641         logger.info(EELFLoggerDelegate.applicationLogger,
642                 "***Acquiring lock for atomicPut() values: " + queryObject.getValues().toString());
643         if (conditionInfo != null) {
644             logger.info(EELFLoggerDelegate.applicationLogger,
645                     "***Acquiring lock for atomicPut() conditions: " + conditionInfo.toString());
646         }
647         try {
648             lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
649         } catch (MusicLockingException ex) {
650             logger.error(EELFLoggerDelegate.errorLogger,
651                     "Exception while acquireLockWithLease() in atomic put for key: " + primaryKey);
652             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
653             throw new MusicServiceException(
654                     "Cannot perform atomic put for key: " + primaryKey + " : " + ex.getMessage());
655         }
656         long lockAcqTime = System.currentTimeMillis();
657
658         /*
659          * if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) { logger.info(EELFLoggerDelegate.
660          * applicationLogger,"unable to acquire lock, id " + lockId);
661          * voluntaryReleaseLock(fullyQualifiedKey,lockId); return lockAcqResult; }
662          */
663
664         logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
665         String lockRef = lockId.substring(lockId.lastIndexOf("$"));
666         ReturnType criticalPutResult = null;
667         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
668             criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, queryObject, lockRef, conditionInfo);
669             long criticalPutTime = System.currentTimeMillis();
670             long lockDeleteTime = System.currentTimeMillis();
671             String timingInfo = "|lock creation time:" + (lockCreationTime - start) + "|lock accquire time:"
672                     + (lockAcqTime - lockCreationTime) + "|critical put time:" + (criticalPutTime - lockAcqTime)
673                     + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
674             criticalPutResult.setTimingInfo(timingInfo);
675         } else {
676             logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
677             criticalPutResult = lockAcqResult;
678         }
679         try {
680             voluntaryReleaseLock(fullyQualifiedKey, lockId);
681         } catch (MusicLockingException ex) {
682             logger.info(EELFLoggerDelegate.applicationLogger,
683                     "Exception occured while deleting lock after atomic put for key: " + primaryKey);
684             criticalPutResult.setMessage(criticalPutResult.getMessage() + "Lock release failed");
685         }
686         return criticalPutResult;
687     }
688
689
690
691     /**
692      * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
693      *
694      * @param keyspaceName name of the keyspace
695      * @param tableName name of the table
696      * @param primaryKey primary key value
697      * @param queryObject query object containing prepared query and values
698      * @return ResultSet
699      * @throws MusicServiceException
700      * @throws MusicLockingException
701      * @throws MusicQueryException
702      */
703     public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
704             PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
705         String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
706         String lockId = createLockReference(fullyQualifiedKey, LockType.READ);
707         long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
708         ReturnType lockAcqResult = null;
709         ResultSet result = null;
710         logger.info(EELFLoggerDelegate.applicationLogger, "Acquiring lock for atomicGet() : " + queryObject.getQuery());
711         try {
712             lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
713         } catch (MusicLockingException ex) {
714             logger.error(EELFLoggerDelegate.errorLogger,
715                     "Exception while acquireLockWithLease() in atomic get for key: " + primaryKey);
716             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
717             throw new MusicServiceException(
718                     "Cannot perform atomic get for key: " + primaryKey + " : " + ex.getMessage());
719         }
720         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
721             logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
722             String lockRef = lockId.substring(lockId.lastIndexOf("$"));
723             result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
724         } else {
725             logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
726         }
727         try {
728             voluntaryReleaseLock(fullyQualifiedKey, lockId);
729         } catch (MusicLockingException ex) {
730             logger.info(EELFLoggerDelegate.applicationLogger,
731                     "Exception occured while deleting lock after atomic put for key: " + primaryKey);
732             throw new MusicLockingException(ex.getMessage());
733         }
734
735         return result;
736     }
737
738
739
740     /**
741      * @param lockName
742      * @return
743      */
744     public Map<String, Object> validateLock(String lockName) {
745         return MusicUtil.validateLock(lockName);
746     }
747
748     @Override
749     @Deprecated
750     public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
751         PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
752         return null;
753     }
754
755     @Override
756     public List<String> getLockQueue(String fullyQualifiedKey)
757         throws MusicServiceException, MusicQueryException, MusicLockingException {
758         String[] splitString = fullyQualifiedKey.split("\\.");
759         String keyspace = splitString[0];
760         String table = splitString[1];
761         String primaryKeyValue = splitString[2];
762
763         return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
764     }
765     @Override
766     public long getLockQueueSize(String fullyQualifiedKey)
767         throws MusicServiceException, MusicQueryException, MusicLockingException {
768         String[] splitString = fullyQualifiedKey.split("\\.");
769         String keyspace = splitString[0];
770         String table = splitString[1];
771         String primaryKeyValue = splitString[2];
772
773         return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
774     }
775
776     @Override
777     @Deprecated
778     public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
779             PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
780         //deprecated
781         return null;
782     }
783
784 }