Merge "Include owner in createLockRef"
[music.git] / music-core / src / main / java / org / onap / music / service / impl / MusicCassaCore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  *  Modifications Copyright (c) 2018 IBM. 
7  * ===================================================================
8  *  Modifications Copyright (c) 2019 Samsung
9  * ===================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *
14  *     http://www.apache.org/licenses/LICENSE-2.0
15  *
16  *  Unless required by applicable law or agreed to in writing, software
17  *  distributed under the License is distributed on an "AS IS" BASIS,
18  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  *  See the License for the specific language governing permissions and
20  *  limitations under the License.
21  *
22  * ============LICENSE_END=============================================
23  * ====================================================================
24  */
25
26 package org.onap.music.service.impl;
27
28 import java.io.StringWriter;
29 import java.util.Collections;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34 import java.util.StringTokenizer;
35
36 import javax.ws.rs.core.MultivaluedMap;
37
38 import org.onap.music.datastore.Condition;
39 import org.onap.music.datastore.MusicDataStore;
40 import org.onap.music.datastore.MusicDataStoreHandle;
41 import org.onap.music.datastore.PreparedQueryObject;
42 import org.onap.music.datastore.jsonobjects.JsonDelete;
43 import org.onap.music.datastore.jsonobjects.JsonIndex;
44 import org.onap.music.datastore.jsonobjects.JsonInsert;
45 import org.onap.music.datastore.jsonobjects.JsonKeySpace;
46 import org.onap.music.datastore.jsonobjects.JsonSelect;
47 import org.onap.music.datastore.jsonobjects.JsonTable;
48 import org.onap.music.datastore.jsonobjects.JsonUpdate;
49 import org.onap.music.eelf.logging.EELFLoggerDelegate;
50 import org.onap.music.eelf.logging.format.AppMessages;
51 import org.onap.music.eelf.logging.format.ErrorSeverity;
52 import org.onap.music.eelf.logging.format.ErrorTypes;
53 import org.onap.music.exceptions.MusicDeadlockException;
54 import org.onap.music.exceptions.MusicLockingException;
55 import org.onap.music.exceptions.MusicQueryException;
56 import org.onap.music.exceptions.MusicServiceException;
57 import org.onap.music.lockingservice.cassandra.CassaLockStore;
58 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
59 import org.onap.music.lockingservice.cassandra.LockType;
60 import org.onap.music.lockingservice.cassandra.MusicLockState;
61 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
62 import org.onap.music.main.MusicUtil;
63 import org.onap.music.main.ResultType;
64 import org.onap.music.main.ReturnType;
65 import org.onap.music.service.MusicCoreService;
66
67 import com.datastax.driver.core.DataType;
68 import com.datastax.driver.core.ResultSet;
69 import com.datastax.driver.core.Row;
70 import com.datastax.driver.core.TableMetadata;
71
72 public class MusicCassaCore implements MusicCoreService {
73
74     private static CassaLockStore mLockHandle = null;
75     private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
76     private static MusicCassaCore musicCassaCoreInstance = null;
77     private static Set<String> set = Collections.synchronizedSet(new HashSet<String>());
78
79     private MusicCassaCore() {
80         // not going to happen
81     }
82     
83     public static CassaLockStore getmLockHandle() {
84         return mLockHandle;
85     }
86
87     public static void setmLockHandle(CassaLockStore mLockHandle) {
88         MusicCassaCore.mLockHandle = mLockHandle;
89     }
90     
91     public static MusicCassaCore getInstance() {
92
93         if(musicCassaCoreInstance == null) {
94             musicCassaCoreInstance = new MusicCassaCore();
95         }
96         return musicCassaCoreInstance;
97     }
98
99     public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
100         logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
101         long start = System.currentTimeMillis();
102
103         if (mLockHandle == null) {
104             try {
105                 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
106             } catch (Exception e) {
107                 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
108                 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
109             }
110         }
111         long end = System.currentTimeMillis();
112         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
113         return mLockHandle;
114     }
115
116     public String createLockReferenceAtomic(String fullyQualifiedKey) throws MusicLockingException {
117         return createLockReferenceAtomic(fullyQualifiedKey, LockType.WRITE);
118     }
119     public String createLockReference(String fullyQualifiedKey, String owner) throws MusicLockingException {
120         return createLockReference(fullyQualifiedKey, LockType.WRITE, owner);
121     }
122
123     /**
124      * This will be called for Atomic calls
125      * 
126      */
127     public String createLockReferenceAtomic(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
128         String[] splitString = fullyQualifiedKey.split("\\.");
129         if (splitString.length < 3) {
130             throw new MusicLockingException("Missing or incorrect lock details. Check table or key name.");
131         }
132         String keyspace = splitString[0];
133         String table = splitString[1];
134         String lockName = splitString[2];
135
136         logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
137         long start = 0L;
138         long end = 0L;
139         String lockReference = null;
140         LockObject peek = null;
141
142         /** Lets check for an existing lock. 
143          * This will allow us to limit the amount of requests going forward.
144          */
145         start = System.currentTimeMillis();
146         try {
147             peek = getLockingServiceHandle().peekLockQueue(keyspace, table, lockName);
148         } catch (MusicServiceException | MusicQueryException e) {
149             //logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(),e);
150             throw new MusicLockingException("Error getting lockholder info for key [" + lockName +"]:" + e.getMessage());
151         }
152         
153         if(peek!=null && (peek.getLocktype()!=null && peek.getLocktype().equals(LockType.WRITE)) && peek.getAcquireTime()!=null && peek.getLockRef()!=null) {
154             long currentTime = System.currentTimeMillis();
155             if((currentTime-Long.parseLong(peek.getAcquireTime()))<MusicUtil.getDefaultLockLeasePeriod()){
156                 //logger.info(EELFLoggerDelegate.applicationLogger,"Lock holder exists and lease not expired. Please try again for key="+lockName);
157                 throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. Please try again.");
158             }
159         }
160         end = System.currentTimeMillis();
161         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to check for lock reference for key [" + lockName + "]:" + (end - start) + " ms");
162
163         start = System.currentTimeMillis();
164         /* We are Creating a Thread safe set and adding the key to the set. 
165         * if a key exists then it wil be passed over and not go to the lock creation. 
166         * If a key doesn't exist then it will set the value in the set and continue to create a lock. 
167         *
168         * This will ensure that no 2 threads using the same key will be able to try to create a lock
169         * This wil in turn squash the amout of LWT Chatter in Cassandra an reduce the amount of
170         * WriteTimeoutExceptions being experiences on single keys.
171         */
172         if ( set.add(fullyQualifiedKey)) {
173             try {
174                 lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype,null);
175                 set.remove(fullyQualifiedKey);
176             } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
177                 set.remove(fullyQualifiedKey);
178                 throw new MusicLockingException(e.getMessage());
179             } catch (Exception e) {
180                 set.remove(fullyQualifiedKey);
181                 e.printStackTrace();
182                 logger.error(EELFLoggerDelegate.applicationLogger,"Exception in creatLockEnforced:"+ e.getMessage(),e);
183                 throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. " + e.getMessage());
184             }
185         } else {
186             throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. Please try again.");
187         }
188         end = System.currentTimeMillis();
189         logger.info(EELFLoggerDelegate.debugLogger,"### Set = " + set);
190         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference  for key [" + lockName + "]:" + (end - start) + " ms");
191         return lockReference;
192
193         //return createLockReference(fullyQualifiedKey, locktype, null);
194     }
195
196     public String createLockReference(String fullyQualifiedKey, LockType locktype, String owner) throws MusicLockingException {
197         String[] splitString = fullyQualifiedKey.split("\\.");
198         if (splitString.length < 3) {
199             throw new MusicLockingException("Missing or incorrect lock details. Check table or key name.");
200         }
201         String keyspace = splitString[0];
202         String table = splitString[1];
203         String lockName = splitString[2];
204
205         logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
206         long start = 0L;
207         long end = 0L;
208         String lockReference = null;
209
210         /* Check for a Deadlock */
211         try {
212             boolean deadlock = getLockingServiceHandle().checkForDeadlock(keyspace, table, lockName, locktype, owner, false);
213             if (deadlock) {
214                 MusicDeadlockException e = new MusicDeadlockException("Deadlock detected when " + owner + " tried to create lock on " + keyspace + "." + table + "." + lockName);
215                 e.setValues(owner, keyspace, table, lockName);
216                 throw e;
217             }
218         } catch (MusicDeadlockException e) {
219             //just threw this, no need to wrap it
220             throw e;
221         } catch (MusicServiceException | MusicQueryException e) {
222             logger.error(EELFLoggerDelegate.applicationLogger, e);
223             throw new MusicLockingException("Unable to check for deadlock. " + e.getMessage(), e);
224         }
225         end = System.currentTimeMillis();
226         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to check for deadlock for key [" + lockName + "]:" + (end - start) + " ms");
227
228         start = System.currentTimeMillis();
229         try {
230             lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype, owner);
231         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
232             logger.info(EELFLoggerDelegate.applicationLogger,e.getMessage(),e);
233             throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. Please try again: " + e.getMessage());
234         } catch (Exception e) {
235             logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(),e);
236             throw new MusicLockingException("Unable to create lock reference. " + e.getMessage(), e);
237         }
238         end = System.currentTimeMillis();
239         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference  for key [" + lockName + "]:" + (end - start) + " ms");
240         return lockReference;
241     }
242     
243     public ReturnType promoteLock(String lockId) throws MusicLockingException {
244         String[] splitString = lockId.split("\\.");
245         String keyspace = splitString[0].substring(1);//remove '$'
246         String table = splitString[1];
247         String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
248         String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
249         
250         logger.info(EELFLoggerDelegate.applicationLogger,"Attempting to promote lock " + lockId);
251
252         try {
253             return getLockingServiceHandle().promoteLock(keyspace, table, primaryKeyValue, lockRef);
254         } catch (MusicServiceException e) {
255             throw new MusicLockingException("Unable to promote lock. ", e);
256         } catch (MusicQueryException e) {
257             throw new MusicLockingException("Unable to promote lock. ", e);
258         }
259         
260     }
261
262
263     public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod)
264             throws MusicLockingException, MusicQueryException, MusicServiceException  {
265         evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
266         return acquireLock(fullyQualifiedKey, lockReference);
267     }
268
269     private void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod)
270             throws MusicLockingException, MusicQueryException, MusicServiceException {
271         String[] splitString = fullyQualifiedKey.split("\\.");
272         String keyspace = splitString[0];
273         String table = splitString[1];
274         String primaryKeyValue = splitString[2];
275
276         LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
277
278         if (!currentLockHolderObject.getIsLockOwner()) { // no lock holder
279             return;
280         }
281         /*
282          * Release the lock of the previous holder if it has expired. if the update to the acquire time has
283          * not reached due to network delays, simply use the create time as the reference
284          */
285         long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.getAcquireTime()),
286                 Long.parseLong(currentLockHolderObject.getCreateTime()));
287         if ((System.currentTimeMillis() - referenceTime) > leasePeriod) {
288             forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.getLockRef() + "");
289             logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.getLockRef() + " forcibly released");
290         }
291     }
292
293     public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
294             throws MusicLockingException, MusicQueryException, MusicServiceException {
295         String[] splitString = lockId.split("\\.");
296         String keyspace = splitString[0].substring(1);//remove '$'
297         String table = splitString[1];
298         String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
299         String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
300         String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
301
302         LockObject lockInfo = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue, lockRef);
303
304         if (!lockInfo.getIsLockOwner()) {
305             return new ReturnType(ResultType.FAILURE, lockId + " is not a lock holder");//not top of the lock store q
306         }
307         
308         if (getLockingServiceHandle().checkForDeadlock(keyspace, table, primaryKeyValue, lockInfo.getLocktype(), lockInfo.getOwner(), true)) {
309             MusicDeadlockException e = new MusicDeadlockException("Deadlock detected when " + lockInfo.getOwner()  + " tried to create lock on " + keyspace + "." + table + "." + primaryKeyValue);
310             e.setValues(lockInfo.getOwner(), keyspace, table, primaryKeyValue);
311             throw e;
312         }
313
314         //check to see if the value of the key has to be synced in case there was a forceful release
315         String syncTable = keyspace+".unsyncedKeys_"+table;
316         String query = "select * from "+syncTable+" where key='"+localFullyQualifiedKey+"';";
317         PreparedQueryObject readQueryObject = new PreparedQueryObject();
318         readQueryObject.appendQueryString(query);
319         ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
320         if (!results.all().isEmpty()) {
321             logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
322             try {
323                 syncQuorum(keyspace, table, primaryKeyValue);
324             } catch (Exception e) {
325                 StringWriter sw = new StringWriter();
326                     logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",
327                         ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
328                 String exceptionAsString = sw.toString();
329                 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
330             }
331             String cleanQuery = "delete from " + syncTable + " where key='"+localFullyQualifiedKey+"';";
332             PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
333             deleteQueryObject.appendQueryString(cleanQuery);
334             MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
335         }
336
337         getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
338
339         return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
340     }
341
342
343
344     /**
345      *
346      * @param tableQueryObject
347      * @param consistency
348      * @return Boolean Indicates success or failure
349      * @throws MusicServiceException
350      *
351      *
352      */
353     public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject,
354             String consistency) throws MusicServiceException {
355         boolean result = false;
356
357         try {
358             // create shadow locking table
359             result = getLockingServiceHandle().createLockQueue(keyspace, table);
360             if (result == false)
361                 return ResultType.FAILURE;
362
363             result = false;
364
365             // create table to track unsynced_keys
366             table = "unsyncedKeys_" + table;
367
368             String tabQuery =
369                     "CREATE TABLE IF NOT EXISTS " + keyspace + "." + table + " ( key text,PRIMARY KEY (key) );";
370             PreparedQueryObject queryObject = new PreparedQueryObject();
371
372             queryObject.appendQueryString(tabQuery);
373             result = false;
374             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
375
376             // create actual table
377             result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
378         } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
379             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR, ErrorSeverity.WARN,
380                     ErrorTypes.MUSICSERVICEERROR);
381             throw new MusicServiceException(ex.getMessage());
382         }
383         return result ? ResultType.SUCCESS : ResultType.FAILURE;
384     }
385
386     private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
387         logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
388         PreparedQueryObject selectQuery = new PreparedQueryObject();
389         PreparedQueryObject updateQuery = new PreparedQueryObject();
390
391         // get the primary key d
392         TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
393         String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName(); // we only support single
394                                                                             // primary key
395         DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
396         Object cqlFormattedPrimaryKeyValue =
397                         MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
398
399         // get the row of data from a quorum
400         selectQuery.appendQueryString("SELECT *  FROM " + keyspace + "." + table + " WHERE "
401                         + primaryKeyName + "= ?" + ";");
402         selectQuery.addValue(cqlFormattedPrimaryKeyValue);
403         MusicUtil.writeBackToQuorum(selectQuery, primaryKeyName, updateQuery, keyspace, table,
404             cqlFormattedPrimaryKeyValue);
405     }
406
407     /**
408      *
409      * @param query
410      * @return ResultSet
411      */
412     public ResultSet quorumGet(PreparedQueryObject query) {
413         ResultSet results = null;
414         try {
415             results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
416         } catch (MusicServiceException | MusicQueryException e) {
417             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR,
418                 ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR, e);
419
420         }
421         return results;
422     }
423
424     public String whoseTurnIsIt(String fullyQualifiedKey) {
425         String[] splitString = fullyQualifiedKey.split("\\.");
426         String keyspace = splitString[0];
427         String table = splitString[1];
428         String primaryKeyValue = splitString[2];
429         try {
430             LockObject lockOwner = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
431             if (!lockOwner.getIsLockOwner()) {
432                 return null;
433             }
434             return "$" + fullyQualifiedKey + "$" + lockOwner.getLockRef();
435         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
436             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.LOCKINGERROR + fullyQualifiedKey,
437                     ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
438         }
439         return null;
440     }
441     
442     public List<String> getCurrentLockHolders(String fullyQualifiedKey) {
443         String[] splitString = fullyQualifiedKey.split("\\.");
444         String keyspace = splitString[0];
445         String table = splitString[1];
446         String primaryKeyValue = splitString[2];
447         try {
448             return getLockingServiceHandle().getCurrentLockHolders(keyspace, table, primaryKeyValue);
449         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
450             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
451         }
452         return null;
453     }
454
455     /**
456      *
457      * @param lockReference
458      * @return
459      */
460     public static String getLockNameFromId(String lockReference) {
461         StringTokenizer st = new StringTokenizer(lockReference);
462         return st.nextToken("$");
463     }
464
465     @Override
466     public void destroyLockRef(String lockId) throws MusicLockingException {
467         long start = System.currentTimeMillis();
468         String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
469         String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
470         String[] splitString = fullyQualifiedKey.split("\\.");
471         String keyspace = splitString[0];
472         String table = splitString[1];
473         String primaryKeyValue = splitString[2];
474         try {
475             getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef,MusicUtil.getRetryCount());
476         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
477             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef,
478                 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
479             throw new MusicLockingException(e.getMessage());
480         }
481         long end = System.currentTimeMillis();
482         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
483     }
484
485     public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
486         long start = System.currentTimeMillis();
487         String[] splitString = fullyQualifiedKey.split("\\.");
488         String keyspace = splitString[0];
489         String table = splitString[1];
490         String primaryKeyValue = splitString[2];
491         try {
492             getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference,MusicUtil.getRetryCount());
493         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
494             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK + lockReference,
495                 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR,e);
496             throw new MusicLockingException(e.getMessage());
497         }
498         long end = System.currentTimeMillis();
499         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
500         return new MusicLockState(LockStatus.UNLOCKED, "");
501     }
502
503     @Override
504     public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) throws MusicLockingException {
505         String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
506         String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
507         if (voluntaryRelease) {
508             return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
509         } else {
510             return forciblyReleaseLock(fullyQualifiedKey, lockRef);
511         }
512     }
513
514     public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference)
515             throws MusicLockingException {
516         MusicLockState result = null;
517         try {
518             result = destroyLockRef(fullyQualifiedKey, lockReference);
519         } catch (Exception ex) {
520             logger.info(EELFLoggerDelegate.applicationLogger,
521                     "Exception in voluntaryReleaseLock() for " + fullyQualifiedKey + "ref: " + lockReference);
522             throw new MusicLockingException(ex.getMessage());
523         }
524         return result;
525     }
526
527     public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
528         String[] splitString = fullyQualifiedKey.split("\\.");
529         String keyspace = splitString[0];
530         String table = splitString[1];
531
532         //leave a signal that this key could potentially be unsynchronized
533         String syncTable = keyspace+".unsyncedKeys_"+table;
534         PreparedQueryObject queryObject = new PreparedQueryObject();
535         String values = "(?)";
536         queryObject.addValue(fullyQualifiedKey);
537         String insQuery = "insert into "+syncTable+" (key) values "+values+";";
538         queryObject.appendQueryString(insQuery);
539         try {
540             MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
541         } catch (Exception e) {
542             logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
543                         + e.getMessage(), e);
544         }
545
546         //now release the lock
547         return destroyLockRef(fullyQualifiedKey, lockReference);
548     }
549
550     @Override
551     public List<String> releaseAllLocksForOwner(String ownerId, String keyspace, String table) throws MusicLockingException, MusicServiceException, MusicQueryException {
552 //        System.out.println("IN RELEASEALLLOCKSFOROWNER, ");
553
554         List<String> lockIds = getLockingServiceHandle().getAllLocksForOwner(ownerId, keyspace, table);
555         for (String lockId : lockIds) {
556 //            System.out.println(" LOCKID = " + lockId);
557             //return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef);
558             releaseLock("$" + keyspace + "." + table + "." + lockId, true);
559         }
560         return lockIds;
561     }
562
563     /**
564      *
565      * @param lockName
566      * @throws MusicLockingException
567      */
568     @Deprecated
569     public  void deleteLock(String lockName) throws MusicLockingException {
570         throw new MusicLockingException("Depreciated Method Delete Lock");
571     }
572
573     // Prepared Query Additions.
574
575     /**
576      *
577      * @param queryObject
578      * @return ReturnType
579      * @throws MusicServiceException
580      */
581     public  ReturnType eventualPut(PreparedQueryObject queryObject) {
582         boolean result = false;
583         try {
584             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
585         } catch (MusicServiceException | MusicQueryException ex) {
586             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get Lock Handle "  ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
587             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + "  " + ex.getCause() + " " + ex);
588             return new ReturnType(ResultType.FAILURE, ex.getMessage());
589         }
590         if (result) {
591             return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
592         } else {
593             return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
594         }
595     }
596
597     /**
598      *
599      * @param queryObject
600      * @return ReturnType
601      * @throws MusicServiceException
602      */
603     public  ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
604         boolean result = false;
605         long guard = 0;
606         PreparedQueryObject getGaurd = new PreparedQueryObject();
607         getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
608         getGaurd.addValue(primaryKey);
609         try {
610             ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
611             Row row = getGaurdResult.one();
612             if (row != null) {
613                 guard = row.getLong("guard");
614                 long timeOfWrite = System.currentTimeMillis();
615                 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
616                 String query = queryObject.getQuery();
617                 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
618                     if (queryObject.getOperation().equalsIgnoreCase("delete"))
619                         query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
620                     else
621                         query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
622                 }
623                 queryObject.replaceQueryString(query);
624             }
625
626         } catch (MusicServiceException | MusicQueryException e) {
627             logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(), e);
628         }
629         try {
630             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
631         } catch (MusicServiceException | MusicQueryException ex) {
632             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(),"[ERR512E] Failed to get Lock Handle ",
633                 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
634             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + "  " + ex.getCause() + " ", ex);
635             return new ReturnType(ResultType.FAILURE, ex.getMessage());
636         }
637         if (result) {
638             return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
639         } else {
640             return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
641         }
642     }
643
644     /**
645      *
646      * @param keyspace
647      * @param table
648      * @param primaryKeyValue
649      * @param queryObject
650      * @param lockId
651      * @return
652      */
653     public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
654             PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
655         long start = System.currentTimeMillis();
656         try {
657             String keyLock = lockId.substring(lockId.lastIndexOf(".") + 1,lockId.lastIndexOf("$"));
658             if (lockId.contains(".") && !keyLock.equals(primaryKeyValue)) {
659                 return new ReturnType(ResultType.FAILURE,"Lock value '" + keyLock + "' and key value '"
660                 + primaryKeyValue + "' not match. Please check your values: " 
661                 + lockId + " .");
662             }
663             LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
664                     lockId.substring(lockId.lastIndexOf("$") + 1));
665
666             if ( lockObject == null ) {
667                 return new ReturnType(ResultType.FAILURE, lockId + " does not exist.");
668             } else if (!lockObject.getIsLockOwner()) {
669                 return new ReturnType(ResultType.FAILURE, lockId + " is not the lock holder");
670             } else if (lockObject.getLocktype() != LockType.WRITE) {
671                 return new ReturnType(ResultType.FAILURE,
672                         "Attempting to do write operation, but " + lockId + " is a read lock");
673             }
674
675             if (conditionInfo != null) {
676                 try {
677                     if (conditionInfo.testCondition() == false)
678                         return new ReturnType(ResultType.FAILURE, "Lock acquired but the condition is not true");
679                 } catch (Exception e) {
680                     logger.error(EELFLoggerDelegate.errorLogger, e);
681                     return new ReturnType(ResultType.FAILURE,
682                             "Exception thrown while checking the condition, check its sanctity:\n" + e.getMessage());
683                 }
684             }
685             String query = queryObject.getQuery();
686             long timeOfWrite = System.currentTimeMillis();
687             long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$") + 1));
688             long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
689             // TODO: use Statement instead of modifying query
690             if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
691                 if (queryObject.getOperation().equalsIgnoreCase("delete"))
692                     query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
693                 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
694                     query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
695                 else
696                     query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
697             }
698             queryObject.replaceQueryString(query);
699             MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
700             dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
701             long end = System.currentTimeMillis();
702             logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
703         } catch (MusicQueryException | MusicServiceException | MusicLockingException  e) {
704             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), e);
705             return new ReturnType(ResultType.FAILURE,
706                 "Exception thrown while doing the critical put: "
707                 + e.getMessage());
708         }
709         return new ReturnType(ResultType.SUCCESS, "Update performed");
710     }
711
712
713     /**
714      *
715      * @param queryObject
716      * @param consistency
717      * @return Boolean Indicates success or failure
718      * @throws MusicServiceException
719      *
720      *
721      */
722     public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException,MusicQueryException {
723         // this is mainly for some functions like keyspace creation etc which does not
724         // really need the bells and whistles of Music locking.
725         boolean result = false;
726 //        try {
727         result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
728 //        } catch (MusicQueryException | MusicServiceException ex) {
729             // logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
730             //     ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR, ex);
731 //            throw new MusicServiceException(ex.getMessage(),ex);
732 //        }
733         return result ? ResultType.SUCCESS : ResultType.FAILURE;
734     }
735
736     /**
737      * This method performs DDL operation on cassandra.
738      *
739      * @param queryObject query object containing prepared query and values
740      * @return ResultSet
741      * @throws MusicServiceException
742      */
743     public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
744         ResultSet results = null;
745         try {
746             results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
747         } catch (MusicQueryException | MusicServiceException e) {
748             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
749             throw new MusicServiceException(e.getMessage());
750         }
751         return results;
752     }
753
754     /**
755      * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
756      * is used to check if the resource is free.
757      *
758      * @param keyspace name of the keyspace
759      * @param table name of the table
760      * @param primaryKeyValue primary key value
761      * @param queryObject query object containing prepared query and values
762      * @param lockId lock ID to check if the resource is free to perform the operation.
763      * @return ResultSet
764      */
765     public ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
766                     PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
767         ResultSet results = null;
768         String keyLock = lockId.substring(lockId.lastIndexOf(".") + 1,lockId.lastIndexOf("$"));
769         try {
770             if (lockId.contains(".") && !keyLock.equals(primaryKeyValue)) {
771                 throw new MusicLockingException("Lock value '" + keyLock + "' and key value '"
772                 + primaryKeyValue + "' do not match. Please check your values: " 
773                 + lockId + " .");
774             }
775             LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
776                 lockId.substring(lockId.lastIndexOf("$") + 1));
777             if (null == lockObject) {
778                 throw new MusicLockingException("No Lock Object. Please check if lock name or key is correct." 
779                     + lockId + " .");
780             }
781             if ( !lockObject.getIsLockOwner()) {
782                 return null;// not top of the lock store q
783             }
784             results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
785         } catch ( MusicLockingException e ) {
786             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity
787                     .WARN, ErrorTypes.MUSICSERVICEERROR);
788                 throw new MusicServiceException(
789                     "Cannot perform critical get for key: " + primaryKeyValue + " : " + e.getMessage());
790         } catch (MusicQueryException | MusicServiceException e) {
791             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity
792                     .WARN, ErrorTypes.MUSICSERVICEERROR, e);
793                 throw new MusicServiceException(
794                     "Cannot perform critical get for key: " + primaryKeyValue + " : " + e.getMessage());    
795         }
796         return results;
797     }
798
799     /**
800      * This method performs DML operation on cassandra, when the lock of the dd is acquired.
801      *
802      * @param keyspaceName name of the keyspace
803      * @param tableName name of the table
804      * @param primaryKey primary key value
805      * @param queryObject query object containing prepared query and values
806      * @return ReturnType
807      * @throws MusicLockingException
808      * @throws MusicServiceException
809      * @throws MusicQueryException
810      */
811     public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
812             PreparedQueryObject queryObject, Condition conditionInfo)
813             throws MusicLockingException, MusicQueryException, MusicServiceException {
814         long start = System.currentTimeMillis();
815         String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
816         String lockId = createLockReferenceAtomic(fullyQualifiedKey, LockType.WRITE);
817         long lockCreationTime = System.currentTimeMillis();
818         ReturnType lockAcqResult = null;
819         logger.info(EELFLoggerDelegate.applicationLogger,
820                 "***Acquiring lock for atomicPut() query : " + queryObject.getQuery() + " : " + primaryKey);
821         logger.info(EELFLoggerDelegate.applicationLogger,
822                 "***Acquiring lock for atomicPut() values: " + queryObject.getValues().toString());
823         if (conditionInfo != null) {
824             logger.info(EELFLoggerDelegate.applicationLogger,
825                     "***Acquiring lock for atomicPut() conditions: " + conditionInfo.toString());
826         }
827         try {
828             lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
829         } catch (MusicLockingException ex) {
830             logger.error(EELFLoggerDelegate.errorLogger,
831                     "Exception while acquireLockWithLease() in atomic put for key: " + primaryKey);
832             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
833             throw new MusicServiceException(
834                     "Cannot perform atomic put for key: " + primaryKey + " : " + ex.getMessage());
835         }
836         long lockAcqTime = System.currentTimeMillis();
837
838         /*
839          * if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) { logger.info(EELFLoggerDelegate.
840          * applicationLogger,"unable to acquire lock, id " + lockId);
841          * voluntaryReleaseLock(fullyQualifiedKey,lockId); return lockAcqResult; }
842          */
843
844         logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
845         String lockRef = lockId.substring(lockId.lastIndexOf("$"));
846         ReturnType criticalPutResult = null;
847         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
848             criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, queryObject, lockRef, conditionInfo);
849             long criticalPutTime = System.currentTimeMillis();
850             long lockDeleteTime = System.currentTimeMillis();
851             String timingInfo = "|lock creation time:" + (lockCreationTime - start) + "|lock accquire time:"
852                     + (lockAcqTime - lockCreationTime) + "|critical put time:" + (criticalPutTime - lockAcqTime)
853                     + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
854             criticalPutResult.setTimingInfo(timingInfo);
855         } else {
856             logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
857             criticalPutResult = lockAcqResult;
858         }
859         try {
860             voluntaryReleaseLock(fullyQualifiedKey, lockId);
861         } catch (MusicLockingException ex) {
862             logger.info(EELFLoggerDelegate.applicationLogger,
863                     "Exception occured while deleting lock after atomic put for key: " + primaryKey);
864             criticalPutResult.setMessage(criticalPutResult.getMessage() + "Lock release failed");
865         }
866         return criticalPutResult;
867     }
868
869
870
871     /**
872      * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
873      *
874      * @param keyspaceName name of the keyspace
875      * @param tableName name of the table
876      * @param primaryKey primary key value
877      * @param queryObject query object containing prepared query and values
878      * @return ResultSet
879      * @throws MusicServiceException
880      * @throws MusicLockingException
881      * @throws MusicQueryException
882      */
883     public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
884             PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
885         String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
886         String lockId = createLockReferenceAtomic(fullyQualifiedKey, LockType.READ);
887         ReturnType lockAcqResult = null;
888         ResultSet result = null;
889         logger.info(EELFLoggerDelegate.applicationLogger, "Acquiring lock for atomicGet() : " + queryObject.getQuery());
890         try {
891             lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
892         } catch (MusicLockingException ex) {
893             logger.error(EELFLoggerDelegate.errorLogger,
894                     "Exception while acquireLockWithLease() in atomic get for key: " + primaryKey);
895             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
896             throw new MusicServiceException(
897                     "Cannot perform atomic get for key: " + primaryKey + " : " + ex.getMessage());
898         }
899         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
900             logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
901             String lockRef = lockId.substring(lockId.lastIndexOf("$"));
902             result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
903         } else {
904             logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
905         }
906         try {
907             voluntaryReleaseLock(fullyQualifiedKey, lockId);
908         } catch (MusicLockingException ex) {
909             logger.info(EELFLoggerDelegate.applicationLogger,
910                     "Exception occured while deleting lock after atomic put for key: " + primaryKey);
911             throw new MusicLockingException(ex.getMessage());
912         }
913
914         return result;
915     }
916
917
918
919     /**
920      * @param lockName
921      * @return
922      */
923     public Map<String, Object> validateLock(String lockName) {
924         return MusicUtil.validateLock(lockName);
925     }
926
927     @Override
928     @Deprecated
929     public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
930         PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
931         return null;
932     }
933
934     @Override
935     public List<String> getLockQueue(String fullyQualifiedKey)
936         throws MusicServiceException, MusicQueryException, MusicLockingException {
937         String[] splitString = fullyQualifiedKey.split("\\.");
938         String keyspace = splitString[0];
939         String table = splitString[1];
940         String primaryKeyValue = splitString[2];
941
942         return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
943     }
944     @Override
945     public long getLockQueueSize(String fullyQualifiedKey)
946         throws MusicServiceException, MusicQueryException, MusicLockingException {
947         String[] splitString = fullyQualifiedKey.split("\\.");
948         String keyspace = splitString[0];
949         String table = splitString[1];
950         String primaryKeyValue = splitString[2];
951
952         return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
953     }
954
955     @Override
956     @Deprecated
957     public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
958             PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
959         //deprecated
960         return null;
961     }
962     
963     //Methods added for ORM changes
964     
965     public ResultType createKeyspace(JsonKeySpace jsonKeySpaceObject,String consistencyInfo) 
966             throws MusicServiceException,MusicQueryException {
967         ResultType result = nonKeyRelatedPut(jsonKeySpaceObject.genCreateKeyspaceQuery(), consistencyInfo);
968         logger.info(EELFLoggerDelegate.applicationLogger, " Keyspace Creation Process completed successfully");
969
970         return result;
971     }
972     
973     public ResultType dropKeyspace(JsonKeySpace jsonKeySpaceObject, String consistencyInfo) 
974             throws MusicServiceException,MusicQueryException {
975         ResultType result = nonKeyRelatedPut(jsonKeySpaceObject.genDropKeyspaceQuery(),
976                     consistencyInfo);
977         logger.info(EELFLoggerDelegate.applicationLogger, " Keyspace deletion Process completed successfully");
978         return result;
979     }
980     
981     public ResultType createTable(JsonTable jsonTableObject, String consistencyInfo) 
982             throws MusicServiceException, MusicQueryException {
983         ResultType result = null;
984         try {
985             result = createTable(jsonTableObject.getKeyspaceName(), 
986                     jsonTableObject.getTableName(), jsonTableObject.genCreateTableQuery(), consistencyInfo);
987             
988         } catch (MusicServiceException ex) {
989             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR, ErrorSeverity.WARN,
990                     ErrorTypes.MUSICSERVICEERROR);
991             throw new MusicServiceException(ex.getMessage());
992         }
993         logger.info(EELFLoggerDelegate.applicationLogger, " Table Creation Process completed successfully ");
994         return result;
995     }
996     
997     public ResultType dropTable(JsonTable jsonTableObject,String consistencyInfo) 
998             throws MusicServiceException,MusicQueryException {
999         ResultType result = nonKeyRelatedPut(jsonTableObject.genDropTableQuery(),
1000                     consistencyInfo);
1001         logger.info(EELFLoggerDelegate.applicationLogger, " Table deletion Process completed successfully ");
1002         
1003         return result;
1004     }
1005     
1006     @Override
1007     public ResultType createIndex(JsonIndex jsonIndexObject, String consistencyInfo) 
1008             throws MusicServiceException, MusicQueryException{
1009         ResultType result = nonKeyRelatedPut(jsonIndexObject.genCreateIndexQuery(),
1010                     consistencyInfo);
1011         
1012         logger.info(EELFLoggerDelegate.applicationLogger, " Index creation Process completed successfully ");
1013         return result;
1014     }
1015     
1016     /**
1017      * This method performs DDL operation on cassandra.
1018      *
1019      * @param queryObject query object containing prepared query and values
1020      * @return ResultSet
1021      * @throws MusicServiceException
1022      */
1023     public  ResultSet select(JsonSelect jsonSelect, MultivaluedMap<String, String> rowParams) 
1024             throws MusicServiceException, MusicQueryException {
1025         ResultSet results = null;
1026         try {
1027             results = get(jsonSelect.genSelectQuery(rowParams));
1028         } catch (MusicServiceException e) {
1029             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
1030             throw new MusicServiceException(e.getMessage());
1031         }
1032         return results;
1033     }
1034     
1035     /**
1036      * Select Critical
1037      */
1038     public ResultSet selectCritical(JsonInsert jsonInsertObj, MultivaluedMap<String, String> rowParams)
1039             throws MusicLockingException, MusicQueryException, MusicServiceException {
1040         
1041         ResultSet results = null;
1042         String consistency = "";
1043         if(null != jsonInsertObj && null != jsonInsertObj.getConsistencyInfo()) {
1044             consistency = jsonInsertObj.getConsistencyInfo().get("type");
1045         }
1046         
1047         String lockId = jsonInsertObj.getConsistencyInfo().get("lockId");
1048         
1049         PreparedQueryObject queryObject = jsonInsertObj.genSelectCriticalPreparedQueryObj(rowParams);
1050         
1051         if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
1052             results = criticalGet(jsonInsertObj.getKeyspaceName(), jsonInsertObj.getTableName(), 
1053                     jsonInsertObj.getPrimaryKeyVal(), queryObject,lockId);
1054         } else if (consistency.equalsIgnoreCase(MusicUtil.ATOMIC)) {
1055             results = atomicGet(jsonInsertObj.getKeyspaceName(), jsonInsertObj.getTableName(),
1056                     jsonInsertObj.getPrimaryKeyVal(), queryObject);
1057         }
1058         
1059         return results;
1060     }
1061     
1062     /**
1063      * this is insert row into Table
1064      */
1065     public ReturnType insertIntoTable(JsonInsert jsonInsertObj)
1066             throws MusicLockingException, MusicQueryException, MusicServiceException {
1067         
1068         String consistency = "";
1069         if(null != jsonInsertObj && null != jsonInsertObj.getConsistencyInfo()) {
1070             consistency = jsonInsertObj.getConsistencyInfo().get("type");
1071         }
1072         
1073         ReturnType result = null;
1074         
1075         try {
1076             PreparedQueryObject queryObj = null;
1077             queryObj = jsonInsertObj.genInsertPreparedQueryObj();
1078             
1079             if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
1080                 result = eventualPut(jsonInsertObj.genInsertPreparedQueryObj());
1081             } else if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
1082                 String lockId = jsonInsertObj.getConsistencyInfo().get("lockId");
1083                 if(lockId == null) {
1084                     logger.error(EELFLoggerDelegate.errorLogger,"LockId cannot be null. Create lock reference or"
1085                             + " use ATOMIC instead of CRITICAL", ErrorSeverity.FATAL, ErrorTypes.MUSICSERVICEERROR);
1086                     return new ReturnType(ResultType.FAILURE, "LockId cannot be null. Create lock "
1087                             + "and acquire lock or use ATOMIC instead of CRITICAL");
1088                 }
1089                 result = criticalPut(jsonInsertObj.getKeyspaceName(), 
1090                         jsonInsertObj.getTableName(), jsonInsertObj.getPrimaryKeyVal(), jsonInsertObj.genInsertPreparedQueryObj(), lockId,null);
1091             } else if (consistency.equalsIgnoreCase(MusicUtil.ATOMIC)) {
1092                 result = atomicPut(jsonInsertObj.getKeyspaceName(), jsonInsertObj.getTableName(), 
1093                         jsonInsertObj.getPrimaryKeyVal(), jsonInsertObj.genInsertPreparedQueryObj(), null);
1094             }
1095         } catch (Exception ex) {
1096             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity
1097                 .WARN, ErrorTypes.MUSICSERVICEERROR, ex);
1098             return new ReturnType(ResultType.FAILURE, ex.getMessage());
1099         }
1100         
1101         return result;
1102     }
1103     
1104      /**
1105      * This is insert row into Table
1106      */
1107     public ReturnType updateTable(JsonUpdate jsonUpdateObj, MultivaluedMap<String, String> rowParams)
1108             throws MusicLockingException, MusicQueryException, MusicServiceException {
1109         
1110         ReturnType result = null;
1111         String consistency = "";
1112         if(null != jsonUpdateObj && null != jsonUpdateObj.getConsistencyInfo()) {
1113             consistency = jsonUpdateObj.getConsistencyInfo().get("type");
1114         }
1115         PreparedQueryObject queryObject = jsonUpdateObj.genUpdatePreparedQueryObj(rowParams);
1116         
1117         Condition conditionInfo;
1118         if (jsonUpdateObj.getConditions() == null) {
1119             conditionInfo = null;
1120         } else {
1121             // to avoid parsing repeatedly, just send the select query to obtain row
1122             PreparedQueryObject selectQuery = new PreparedQueryObject();
1123             selectQuery.appendQueryString("SELECT *  FROM " + jsonUpdateObj.getKeyspaceName() + "." + jsonUpdateObj.getTableName() + " WHERE "
1124                 + jsonUpdateObj.getRowIdString() + ";");
1125             selectQuery.addValue(jsonUpdateObj.getPrimarKeyValue());
1126             conditionInfo = new Condition(jsonUpdateObj.getConditions(), selectQuery);
1127         }
1128
1129         
1130         if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
1131             result = eventualPut(queryObject);
1132         } else if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
1133             String lockId = jsonUpdateObj.getConsistencyInfo().get("lockId");
1134             if(lockId == null) {
1135                 logger.error(EELFLoggerDelegate.errorLogger,"LockId cannot be null. Create lock reference or"
1136                         + " use ATOMIC instead of CRITICAL", ErrorSeverity.FATAL, ErrorTypes.MUSICSERVICEERROR);
1137                
1138                 return new ReturnType(ResultType.FAILURE, "LockId cannot be null. Create lock "
1139                         + "and acquire lock or use ATOMIC instead of CRITICAL");
1140             }
1141             result = criticalPut(jsonUpdateObj.getKeyspaceName(), jsonUpdateObj.getTableName(), jsonUpdateObj.getPrimarKeyValue(),
1142                             queryObject, lockId, conditionInfo);
1143         } else if (consistency.equalsIgnoreCase("atomic_delete_lock")) {
1144             // this function is mainly for the benchmarks
1145             try {
1146                 result = atomicPutWithDeleteLock(jsonUpdateObj.getKeyspaceName(), jsonUpdateObj.getTableName(),
1147                         jsonUpdateObj.getPrimarKeyValue(), queryObject, conditionInfo);
1148             } catch (MusicLockingException e) {
1149                 logger.error(EELFLoggerDelegate.errorLogger,e, AppMessages.UNKNOWNERROR  ,ErrorSeverity.WARN,
1150                     ErrorTypes.GENERALSERVICEERROR, e);
1151                 throw new MusicLockingException(AppMessages.UNKNOWNERROR.toString());
1152                 
1153             }
1154         } else if (consistency.equalsIgnoreCase(MusicUtil.ATOMIC)) {
1155             try {
1156                 result = atomicPut(jsonUpdateObj.getKeyspaceName(), jsonUpdateObj.getTableName(), jsonUpdateObj.getPrimarKeyValue(),
1157                     queryObject, conditionInfo);
1158             } catch (MusicLockingException e) {
1159                 logger.error(EELFLoggerDelegate.errorLogger,e, AppMessages.UNKNOWNERROR  ,ErrorSeverity.WARN, ErrorTypes.GENERALSERVICEERROR, e);
1160                 throw new MusicLockingException(AppMessages.UNKNOWNERROR.toString());
1161             }
1162         } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL_NB)) {
1163             try {
1164                 result = eventualPut_nb(queryObject, jsonUpdateObj.getKeyspaceName(),
1165                         jsonUpdateObj.getTableName(), jsonUpdateObj.getPrimarKeyValue());
1166             }catch (Exception e) {
1167                 return new ReturnType(ResultType.FAILURE, e.getMessage());
1168             }
1169             
1170         }
1171         
1172         return result;
1173     }
1174     
1175     /**
1176      * This method is for Delete From Table
1177      */
1178     public ReturnType deleteFromTable(JsonDelete jsonDeleteObj, MultivaluedMap<String, String> rowParams)
1179             throws MusicLockingException, MusicQueryException, MusicServiceException {
1180         
1181         ReturnType result = null;
1182         String consistency = "";
1183         if(null != jsonDeleteObj && null != jsonDeleteObj.getConsistencyInfo()) {
1184             consistency = jsonDeleteObj.getConsistencyInfo().get("type");
1185         }
1186         PreparedQueryObject queryObject = jsonDeleteObj.genDeletePreparedQueryObj(rowParams);
1187         
1188         // get the conditional, if any
1189         Condition conditionInfo;
1190         if (jsonDeleteObj.getConditions() == null) {
1191             conditionInfo = null;
1192         } else {
1193             // to avoid parsing repeatedly, just send the select query to obtain row
1194             PreparedQueryObject selectQuery = new PreparedQueryObject();
1195             selectQuery.appendQueryString("SELECT *  FROM " + jsonDeleteObj.getKeyspaceName() + "." + jsonDeleteObj.getTableName() + " WHERE "
1196                 + jsonDeleteObj.getRowIdString() + ";");
1197             selectQuery.addValue(jsonDeleteObj.getPrimarKeyValue());
1198             conditionInfo = new Condition(jsonDeleteObj.getConditions(), selectQuery);
1199         }
1200         
1201         if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL))
1202             result = eventualPut(queryObject);
1203         else if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
1204             String lockId = jsonDeleteObj.getConsistencyInfo().get("lockId");
1205             if(lockId == null) {
1206                 logger.error(EELFLoggerDelegate.errorLogger,"LockId cannot be null. Create lock reference or"
1207                     + " use ATOMIC instead of CRITICAL", ErrorSeverity.FATAL, ErrorTypes.MUSICSERVICEERROR);
1208                
1209                 return new ReturnType(ResultType.FAILURE, "LockId cannot be null. Create lock "
1210                         + "and acquire lock or use ATOMIC instead of CRITICAL");
1211             }
1212             result = criticalPut(jsonDeleteObj.getKeyspaceName(), 
1213                     jsonDeleteObj.getTableName(), jsonDeleteObj.getPrimarKeyValue(),
1214                 queryObject, lockId, conditionInfo);
1215         } else if (consistency.equalsIgnoreCase(MusicUtil.ATOMIC)) {
1216             result = atomicPut(jsonDeleteObj.getKeyspaceName(), 
1217                     jsonDeleteObj.getTableName(), jsonDeleteObj.getPrimarKeyValue(),
1218                 queryObject, conditionInfo);
1219         } else if(consistency.equalsIgnoreCase(MusicUtil.EVENTUAL_NB)) {                    
1220             result = eventualPut_nb(queryObject, jsonDeleteObj.getKeyspaceName(), 
1221                     jsonDeleteObj.getTableName(), jsonDeleteObj.getPrimarKeyValue());
1222         }
1223         
1224         return result;
1225     }
1226
1227
1228 }