Merge "Added unit test case"
[music.git] / src / main / java / org / onap / music / service / impl / MusicCassaCore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  *  Modifications Copyright (c) 2018 IBM. 
7  * ===================================================================
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *
20  * ============LICENSE_END=============================================
21  * ====================================================================
22  */
23
24 package org.onap.music.service.impl;
25
26 import java.io.StringWriter;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.StringTokenizer;
31
32 import org.onap.music.datastore.MusicDataStore;
33 import org.onap.music.datastore.MusicDataStoreHandle;
34 import org.onap.music.datastore.PreparedQueryObject;
35 import org.onap.music.eelf.logging.EELFLoggerDelegate;
36 import org.onap.music.eelf.logging.format.AppMessages;
37 import org.onap.music.eelf.logging.format.ErrorSeverity;
38 import org.onap.music.eelf.logging.format.ErrorTypes;
39 import org.onap.music.exceptions.MusicLockingException;
40 import org.onap.music.exceptions.MusicQueryException;
41 import org.onap.music.exceptions.MusicServiceException;
42 import org.onap.music.lockingservice.cassandra.CassaLockStore;
43 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
44 import org.onap.music.lockingservice.cassandra.MusicLockState;
45 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
46 import org.onap.music.main.MusicUtil;
47 import org.onap.music.main.ResultType;
48 import org.onap.music.main.ReturnType;
49 import org.onap.music.service.MusicCoreService;
50
51 import com.datastax.driver.core.ColumnDefinitions;
52 import com.datastax.driver.core.ColumnDefinitions.Definition;
53 import com.datastax.driver.core.DataType;
54 import com.datastax.driver.core.ResultSet;
55 import com.datastax.driver.core.Row;
56 import com.datastax.driver.core.TableMetadata;
57 import com.google.common.util.concurrent.Monitor.Guard;
58
59 import org.onap.music.datastore.*;
60
61 public class MusicCassaCore implements MusicCoreService {
62
63     public static CassaLockStore mLockHandle = null;;
64     private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
65     private static boolean unitTestRun=true;
66     private static MusicCassaCore musicCassaCoreInstance = null;
67     
68     private MusicCassaCore() {
69         
70     }
71     public static MusicCassaCore getInstance() {
72         
73         if(musicCassaCoreInstance == null) {
74             musicCassaCoreInstance = new MusicCassaCore();
75         }
76         return musicCassaCoreInstance;
77     }
78     
79     public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
80         logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
81         long start = System.currentTimeMillis();
82
83         if (mLockHandle == null) {
84             try {
85                 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
86             } catch (Exception e) {
87                 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
88                 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
89             }
90         }
91         long end = System.currentTimeMillis();
92         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
93         return mLockHandle;
94     }
95
96
97
98     public  String createLockReference(String fullyQualifiedKey) {
99         String[] splitString = fullyQualifiedKey.split("\\.");
100         String keyspace = splitString[0];
101         String table = splitString[1];
102         String lockName = splitString[2];
103
104         logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
105         long start = System.currentTimeMillis();
106         String lockReference = null;
107         try {
108             lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName);
109         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
110             e.printStackTrace();
111         }
112         long end = System.currentTimeMillis();
113         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
114         return lockReference;
115     }
116
117
118     public  ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException  {
119          evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
120             return acquireLock(fullyQualifiedKey, lockReference);
121     }
122
123     private  void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod) throws MusicLockingException, MusicQueryException, MusicServiceException {
124
125         String[] splitString = fullyQualifiedKey.split("\\.");
126         String keyspace = splitString[0];
127         String table = splitString[1];
128         String primaryKeyValue = splitString[2];
129
130         LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
131         
132         /* Release the lock of the previous holder if it has expired. if the update to the acquire time has not reached due to network delays, simply use the create time as the 
133          * reference*/
134         
135         long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.acquireTime), Long.parseLong(currentLockHolderObject.createTime));
136         if((System.currentTimeMillis() - referenceTime) > leasePeriod) {
137             forciblyReleaseLock(fullyQualifiedKey,  currentLockHolderObject.lockRef+"");
138             logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.lockRef+" forcibly released");
139         }        
140     }
141     
142     private static ReturnType isTopOfLockStore(String keyspace, String table, String primaryKeyValue, String lockReference) throws MusicLockingException, MusicQueryException, MusicServiceException {
143         
144         //return failure to lock holders too early or already evicted from the lock store
145         String topOfLockStoreS = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
146         long topOfLockStoreL = Long.parseLong(topOfLockStoreS);
147         long lockReferenceL = Long.parseLong(lockReference);
148
149         if(lockReferenceL > topOfLockStoreL) {
150             logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is not the lock holder yet");
151             return new ReturnType(ResultType.FAILURE, lockReference+" is not the lock holder yet");
152         }
153                 
154
155         if(lockReferenceL < topOfLockStoreL) {
156             logger.info(EELFLoggerDelegate.applicationLogger, lockReference+" is no longer/or was never in the lock store queue");
157             return new ReturnType(ResultType.FAILURE, lockReference+" is no longer/or was never in the lock store queue");
158            }
159             
160            return new ReturnType(ResultType.SUCCESS, lockReference+" is top of lock store");
161     }
162     
163     public  ReturnType acquireLock(String fullyQualifiedKey, String lockId)
164             throws MusicLockingException, MusicQueryException, MusicServiceException {
165         String[] splitString = lockId.split("\\.");
166         String keyspace = splitString[0].substring(1);//remove '$'
167         String table = splitString[1];
168         String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
169         String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
170         String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
171         
172         ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockRef);
173         
174         if(result.getResult().equals(ResultType.FAILURE))
175                 return result;//not top of the lock store q
176             
177         //check to see if the value of the key has to be synced in case there was a forceful release
178         String syncTable = keyspace+".unsyncedKeys_"+table;
179         String query = "select * from "+syncTable+" where key='"+localFullyQualifiedKey+"';";
180         PreparedQueryObject readQueryObject = new PreparedQueryObject();
181         readQueryObject.appendQueryString(query);
182         ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);            
183         if (results.all().size() != 0) {
184             logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
185             try {
186                 syncQuorum(keyspace, table, primaryKeyValue);
187             } catch (Exception e) {
188                 StringWriter sw = new StringWriter();
189                    logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
190                 String exceptionAsString = sw.toString();
191                 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);            
192             }
193             String cleanQuery = "delete  from music_internal.unsynced_keys where key='"+localFullyQualifiedKey+"';";
194             PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
195             deleteQueryObject.appendQueryString(cleanQuery);
196             MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
197         }
198         
199         getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
200         
201         return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
202     }
203
204
205
206     /**
207      * 
208      * @param tableQueryObject
209      * @param consistency
210      * @return Boolean Indicates success or failure
211      * @throws MusicServiceException 
212      * 
213      * 
214      */
215     public  ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject, String consistency) throws MusicServiceException {
216             boolean result = false;
217     
218             try {
219                 //create shadow locking table 
220                 result = getLockingServiceHandle().createLockQueue(keyspace, table);
221                 if(result == false) 
222                     return ResultType.FAILURE;
223     
224                 result = false;
225                 
226                 //create table to track unsynced_keys
227                 table = "unsyncedKeys_"+table; 
228                 
229                 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
230                         + " ( key text,PRIMARY KEY (key) );";
231                 System.out.println(tabQuery);
232                 PreparedQueryObject queryObject = new PreparedQueryObject(); 
233                 
234                 queryObject.appendQueryString(tabQuery);
235                 result = false;
236                 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
237
238             
239                 //create actual table
240                 result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
241             } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
242                 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
243                 throw new MusicServiceException(ex.getMessage());
244             }
245             return result?ResultType.SUCCESS:ResultType.FAILURE;
246     }
247
248     private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
249         logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
250         PreparedQueryObject selectQuery = new PreparedQueryObject();
251         PreparedQueryObject updateQuery = new PreparedQueryObject();
252
253         // get the primary key d
254         TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
255         String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName();// we only support single
256                                                                            // primary key
257         DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
258         Object cqlFormattedPrimaryKeyValue =
259                         MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
260
261         // get the row of data from a quorum
262         selectQuery.appendQueryString("SELECT *  FROM " + keyspace + "." + table + " WHERE "
263                         + primaryKeyName + "= ?" + ";");
264         selectQuery.addValue(cqlFormattedPrimaryKeyValue);
265         ResultSet results = null;
266         try {
267             results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(selectQuery);
268             // write it back to a quorum
269             Row row = results.one();
270             ColumnDefinitions colInfo = row.getColumnDefinitions();
271             int totalColumns = colInfo.size();
272             int counter = 1;
273             StringBuilder fieldValueString = new StringBuilder("");
274             for (Definition definition : colInfo) {
275                 String colName = definition.getName();
276                 if (colName.equals(primaryKeyName))
277                     continue;
278                 DataType colType = definition.getType();
279                 Object valueObj = MusicDataStoreHandle.getDSHandle().getColValue(row, colName, colType);
280                 Object valueString = MusicUtil.convertToActualDataType(colType, valueObj);
281                 fieldValueString.append(colName + " = ?");
282                 updateQuery.addValue(valueString);
283                 if (counter != (totalColumns - 1))
284                     fieldValueString.append(",");
285                 counter = counter + 1;
286             }
287             updateQuery.appendQueryString("UPDATE " + keyspace + "." + table + " SET "
288                             + fieldValueString + " WHERE " + primaryKeyName + "= ? " + ";");
289             updateQuery.addValue(cqlFormattedPrimaryKeyValue);
290
291             MusicDataStoreHandle.getDSHandle().executePut(updateQuery, "critical");
292         } catch (MusicServiceException | MusicQueryException e) {
293             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.QUERYERROR +""+updateQuery ,ErrorSeverity.MAJOR, ErrorTypes.QUERYERROR);
294         }
295     }
296
297
298
299
300     /**
301      * 
302      * @param query
303      * @return ResultSet
304      */
305     public  ResultSet quorumGet(PreparedQueryObject query) {
306         ResultSet results = null;
307         try {
308             results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
309         } catch (MusicServiceException | MusicQueryException e) {
310             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR);
311         
312         }
313         return results;
314
315     }
316
317
318
319     /**
320      * 
321      * @param fullyQualifiedKey lockName
322      * @return
323      */
324     public  String whoseTurnIsIt(String fullyQualifiedKey) {
325         String[] splitString = fullyQualifiedKey.split("\\.");
326         String keyspace = splitString[0];
327         String table = splitString[1];
328         String primaryKeyValue = splitString[2];
329         try {
330             return "$" + fullyQualifiedKey + "$" 
331                     + getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue).lockRef;
332         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
333              logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
334         }
335         return null;
336     }
337
338     /**
339      * 
340      * @param lockReference
341      * @return
342      */
343     public static String getLockNameFromId(String lockReference) {
344         StringTokenizer st = new StringTokenizer(lockReference);
345         return st.nextToken("$");
346     }
347
348     @Override
349     public void destroyLockRef(String lockId) {
350         long start = System.currentTimeMillis();
351         String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
352         String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
353         String[] splitString = fullyQualifiedKey.split("\\.");
354         String keyspace = splitString[0];
355         String table = splitString[1];
356         String primaryKeyValue = splitString[2];
357         try {
358             getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef);
359         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
360             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef  ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
361         } 
362         long end = System.currentTimeMillis();
363         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
364     }
365     
366     public  MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) {
367         long start = System.currentTimeMillis();
368         String[] splitString = fullyQualifiedKey.split("\\.");
369         String keyspace = splitString[0];
370         String table = splitString[1];
371         String primaryKeyValue = splitString[2];
372         try {
373             getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference);
374         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
375             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockReference  ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
376         } 
377         long end = System.currentTimeMillis();
378         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
379         return new MusicLockState(LockStatus.UNLOCKED, "");
380     }
381
382     @Override
383     public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) {
384         String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
385         String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
386         if (voluntaryRelease) {
387             return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
388         } else {
389             return forciblyReleaseLock(fullyQualifiedKey, lockRef);
390         }
391     }
392     
393     public   MusicLockState  voluntaryReleaseLock(String fullyQualifiedKey, String lockReference) {
394         return destroyLockRef(fullyQualifiedKey, lockReference);
395     }
396
397     public  MusicLockState  forciblyReleaseLock(String fullyQualifiedKey, String lockReference) {
398         String[] splitString = fullyQualifiedKey.split("\\.");
399         String keyspace = splitString[0];
400         String table = splitString[1];
401
402             //leave a signal that this key could potentially be unsynchronized
403         String syncTable = keyspace+".unsyncedKeys_"+table;    
404         PreparedQueryObject queryObject = new PreparedQueryObject();
405         String values = "(?)";
406         queryObject.addValue(fullyQualifiedKey);
407         String insQuery = "insert into "+syncTable+" (key) values "+values+";";
408         queryObject.appendQueryString(insQuery);
409         try {
410             MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
411         } catch (Exception e) {
412             logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". " 
413                         + e.getMessage());
414         }
415         
416         //now release the lock
417         return destroyLockRef(fullyQualifiedKey, lockReference);
418     }
419
420     /**
421      * 
422      * @param lockName
423      * @throws MusicLockingException 
424      */
425     public  void deleteLock(String lockName) throws MusicLockingException {
426             //deprecated
427         }
428
429     // Prepared Query Additions.
430
431     /**
432      * 
433      * @param queryObject
434      * @return ReturnType
435      * @throws MusicServiceException
436      */
437     public  ReturnType eventualPut(PreparedQueryObject queryObject) {
438         boolean result = false;
439         try {
440             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
441         } catch (MusicServiceException | MusicQueryException ex) {
442             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle "  ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
443             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + "  " + ex.getCause() + " " + ex);
444             return new ReturnType(ResultType.FAILURE, ex.getMessage());
445         }
446         if (result) {
447             return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
448         } else {
449             return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
450         }
451     }
452     
453     /**
454      * 
455      * @param queryObject
456      * @return ReturnType
457      * @throws MusicServiceException
458      */
459     public  ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
460         boolean result = false;
461         long guard = 0;
462         PreparedQueryObject getGaurd = new PreparedQueryObject();
463         getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
464         getGaurd.addValue(primaryKey);
465         try {
466             ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
467             Row row = getGaurdResult.one();
468             if (row != null) {
469                 guard = row.getLong("guard");
470                 long timeOfWrite = System.currentTimeMillis();
471                 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
472                 String query = queryObject.getQuery();
473                 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
474                     if (queryObject.getOperation().equalsIgnoreCase("delete"))
475                         query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
476                     else
477                         query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
478                 }
479                 queryObject.replaceQueryString(query);
480             }
481             
482         } catch (MusicServiceException | MusicQueryException e) {
483             logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage());
484         }   
485         try {
486             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
487         } catch (MusicServiceException | MusicQueryException ex) {
488             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle "  ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
489             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + "  " + ex.getCause() + " " + ex);
490             return new ReturnType(ResultType.FAILURE, ex.getMessage());
491         }
492         if (result) {
493             return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
494         } else {
495             return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
496         }
497     }
498     
499     /**
500      * 
501      * @param keyspace
502      * @param table
503      * @param primaryKeyValue
504      * @param queryObject
505      * @param lockId
506      * @return
507      */
508     public  ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
509                     PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
510         long start = System.currentTimeMillis();
511         try {
512         ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue,
513                 lockId.substring(lockId.lastIndexOf("$")+1));
514         if(result.getResult().equals(ResultType.FAILURE))
515                 return result;//not top of the lock store q
516
517         if (conditionInfo != null)
518             try {
519               if (conditionInfo.testCondition() == false)
520                   return new ReturnType(ResultType.FAILURE,
521                                   "Lock acquired but the condition is not true");
522             } catch (Exception e) {
523               return new ReturnType(ResultType.FAILURE,
524                       "Exception thrown while checking the condition, check its sanctity:\n"
525                                       + e.getMessage());
526             }
527              
528           String query = queryObject.getQuery();
529           long timeOfWrite = System.currentTimeMillis();
530           long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$")+1));
531           long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
532           // TODO: use Statement instead of modifying query
533             if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
534                 if (queryObject.getOperation().equalsIgnoreCase("delete"))
535                     query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
536                 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
537                     query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
538                 else
539                     query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
540             }
541           queryObject.replaceQueryString(query);
542           MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
543           dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
544           long end = System.currentTimeMillis();
545           logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
546         }catch (MusicQueryException | MusicServiceException | MusicLockingException  e) {
547             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
548             return new ReturnType(ResultType.FAILURE,
549                             "Exception thrown while doing the critical put\n"
550                                             + e.getMessage());
551         }
552          return new ReturnType(ResultType.SUCCESS, "Update performed");
553     }
554
555     
556     /**
557      * 
558      * @param queryObject
559      * @param consistency
560      * @return Boolean Indicates success or failure
561      * @throws MusicServiceException 
562      * 
563      * 
564      */
565     public  ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException {
566         // this is mainly for some functions like keyspace creation etc which does not
567         // really need the bells and whistles of Music locking.
568         boolean result = false;
569         try {
570             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
571         } catch (MusicQueryException | MusicServiceException ex) {
572             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
573                     ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
574             throw new MusicServiceException(ex.getMessage());
575         }
576         return result ? ResultType.SUCCESS : ResultType.FAILURE;
577     }
578
579     /**
580      * This method performs DDL operation on cassandra.
581      * 
582      * @param queryObject query object containing prepared query and values
583      * @return ResultSet
584      * @throws MusicServiceException 
585      */
586     public  ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
587         ResultSet results = null;
588         try {
589             results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
590         } catch (MusicQueryException | MusicServiceException e) {
591             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
592             throw new MusicServiceException(e.getMessage());
593         }
594         return results;
595     }
596
597     /**
598      * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
599      * is used to check if the resource is free.
600      * 
601      * @param keyspace name of the keyspace
602      * @param table name of the table
603      * @param primaryKeyValue primary key value
604      * @param queryObject query object containing prepared query and values
605      * @param lockId lock ID to check if the resource is free to perform the operation.
606      * @return ResultSet
607      */
608     public  ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
609                     PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
610         ResultSet results = null;
611         
612         try {
613             ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, 
614                     lockId.substring(lockId.lastIndexOf("$")+1));
615             if(result.getResult().equals(ResultType.FAILURE))
616                     return null;//not top of the lock store q
617                 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
618         } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
619                 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
620         }
621         return results;
622     }
623
624     /**
625      * This method performs DML operation on cassandra, when the lock of the dd is acquired.
626      * 
627      * @param keyspaceName name of the keyspace
628      * @param tableName name of the table
629      * @param primaryKey primary key value
630      * @param queryObject query object containing prepared query and values
631      * @return ReturnType
632      * @throws MusicLockingException 
633      * @throws MusicServiceException 
634      * @throws MusicQueryException 
635      */
636     public  ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
637                     PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException, MusicQueryException, MusicServiceException {
638         long start = System.currentTimeMillis();
639         String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
640         String lockId = createLockReference(fullyQualifiedKey);
641         long lockCreationTime = System.currentTimeMillis();
642         ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockId);
643         long lockAcqTime = System.currentTimeMillis();
644
645         if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
646             logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
647             voluntaryReleaseLock(fullyQualifiedKey,lockId);
648             return lockAcqResult;
649         }
650
651         logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
652         String lockRef = lockId.substring(lockId.lastIndexOf("$"));
653         ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
654                         queryObject, lockRef, conditionInfo);
655         long criticalPutTime = System.currentTimeMillis();
656         voluntaryReleaseLock(fullyQualifiedKey,lockId);
657         long lockDeleteTime = System.currentTimeMillis();
658         String timingInfo = "|lock creation time:" + (lockCreationTime - start)
659                         + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
660                         + "|critical put time:" + (criticalPutTime - lockAcqTime)
661                         + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
662         criticalPutResult.setTimingInfo(timingInfo);
663         return criticalPutResult;
664     }
665     
666
667
668
669     /**
670      * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
671      * 
672      * @param keyspaceName name of the keyspace
673      * @param tableName name of the table
674      * @param primaryKey primary key value
675      * @param queryObject query object containing prepared query and values
676      * @return ResultSet
677      * @throws MusicServiceException
678      * @throws MusicLockingException 
679      * @throws MusicQueryException 
680      */
681     public  ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
682                     PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
683         String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
684         String lockId = createLockReference(fullyQualifiedKey);
685         long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
686         ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockId);
687         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
688             logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
689             String lockRef = lockId.substring(lockId.lastIndexOf("$"));
690             ResultSet result =
691                             criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
692             voluntaryReleaseLock(fullyQualifiedKey,lockId);
693             return result;
694         } else {
695             voluntaryReleaseLock(fullyQualifiedKey,lockId);
696             logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
697             return null;
698         }
699     }
700
701   
702     
703     /**
704      * @param lockName
705      * @return
706      */
707     public Map<String, Object> validateLock(String lockName) {
708         Map<String, Object> resultMap = new HashMap<>();
709         String[] locks = lockName.split("\\.");
710         if(locks.length < 3) {
711             resultMap.put("Error", "Invalid lock. Please make sure lock is of the type keyspaceName.tableName.primaryKey");
712             return resultMap;
713         }
714         String keyspace= locks[0];
715         if(keyspace.startsWith("$"))
716             keyspace = keyspace.substring(1);
717         resultMap.put("keyspace",keyspace);
718         return resultMap;
719     }
720     
721
722     public static void main(String[] args) {
723         String x = "axe top";
724         x = x.replaceFirst("top", "sword");
725         System.out.print(x); //returns sword pickaxe
726     }
727
728
729
730     @Override
731     public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
732             PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
733         //Deprecated
734         return null;
735     }
736     @Override
737     public List<String> getLockQueue(String fullyQualifiedKey)
738             throws MusicServiceException, MusicQueryException, MusicLockingException {
739         String[] splitString = fullyQualifiedKey.split("\\.");
740         String keyspace = splitString[0];
741         String table = splitString[1];
742         String primaryKeyValue = splitString[2];
743
744         return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
745     }
746     @Override
747     public long getLockQueueSize(String fullyQualifiedKey)
748             throws MusicServiceException, MusicQueryException, MusicLockingException {
749         String[] splitString = fullyQualifiedKey.split("\\.");
750         String keyspace = splitString[0];
751         String table = splitString[1];
752         String primaryKeyValue = splitString[2];
753
754         return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
755     }
756     @Override
757     public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
758             PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
759         //deprecated
760         return null;
761     }
762
763 }