Merge "Bug fixes, syncronization, and clean up daemon"
[music.git] / music-core / src / main / java / org / onap / music / service / impl / MusicCassaCore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  *  Modifications Copyright (c) 2018 IBM. 
7  * ===================================================================
8  *  Modifications Copyright (c) 2019 Samsung
9  * ===================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *
14  *     http://www.apache.org/licenses/LICENSE-2.0
15  *
16  *  Unless required by applicable law or agreed to in writing, software
17  *  distributed under the License is distributed on an "AS IS" BASIS,
18  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  *  See the License for the specific language governing permissions and
20  *  limitations under the License.
21  *
22  * ============LICENSE_END=============================================
23  * ====================================================================
24  */
25
26 package org.onap.music.service.impl;
27
28 import java.io.StringWriter;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.HashSet;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.StringTokenizer;
36 import java.util.concurrent.atomic.AtomicInteger;
37 import javax.ws.rs.core.MultivaluedMap;
38
39 import org.onap.music.datastore.Condition;
40 import org.onap.music.datastore.MusicDataStore;
41 import org.onap.music.datastore.MusicDataStoreHandle;
42 import org.onap.music.datastore.PreparedQueryObject;
43 import org.onap.music.datastore.jsonobjects.JsonDelete;
44 import org.onap.music.datastore.jsonobjects.JsonIndex;
45 import org.onap.music.datastore.jsonobjects.JsonInsert;
46 import org.onap.music.datastore.jsonobjects.JsonKeySpace;
47 import org.onap.music.datastore.jsonobjects.JsonSelect;
48 import org.onap.music.datastore.jsonobjects.JsonTable;
49 import org.onap.music.datastore.jsonobjects.JsonUpdate;
50 import org.onap.music.eelf.logging.EELFLoggerDelegate;
51 import org.onap.music.eelf.logging.format.AppMessages;
52 import org.onap.music.eelf.logging.format.ErrorSeverity;
53 import org.onap.music.eelf.logging.format.ErrorTypes;
54 import org.onap.music.exceptions.MusicDeadlockException;
55 import org.onap.music.exceptions.MusicLockingException;
56 import org.onap.music.exceptions.MusicQueryException;
57 import org.onap.music.exceptions.MusicServiceException;
58 import org.onap.music.lockingservice.cassandra.CassaLockStore;
59 import org.onap.music.lockingservice.cassandra.CassaLockStore.LockObject;
60 import org.onap.music.lockingservice.cassandra.LockType;
61 import org.onap.music.lockingservice.cassandra.MusicLockState;
62 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
63 import org.onap.music.main.MusicUtil;
64 import org.onap.music.main.ResultType;
65 import org.onap.music.main.ReturnType;
66 import org.onap.music.service.MusicCoreService;
67
68 import com.datastax.driver.core.DataType;
69 import com.datastax.driver.core.ResultSet;
70 import com.datastax.driver.core.Row;
71 import com.datastax.driver.core.TableMetadata;
72
73 public class MusicCassaCore implements MusicCoreService {
74
75     private static CassaLockStore mLockHandle = null;
76     private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
77     private static MusicCassaCore musicCassaCoreInstance = null;
78     private static Set<String> set = Collections.synchronizedSet(new HashSet<String>());
79     HashMap<String, Integer> map = new HashMap<>();
80     AtomicInteger wait = new AtomicInteger(0);
81     
82     private MusicCassaCore() {
83         // not going to happen
84     }
85     
86     public static CassaLockStore getmLockHandle() {
87         return mLockHandle;
88     }
89
90     public static void setmLockHandle(CassaLockStore mLockHandle) {
91         MusicCassaCore.mLockHandle = mLockHandle;
92     }
93     
94     public static MusicCassaCore getInstance() {
95
96         if(musicCassaCoreInstance == null) {
97             musicCassaCoreInstance = new MusicCassaCore();
98         }
99         return musicCassaCoreInstance;
100     }
101
102     public static CassaLockStore getLockingServiceHandle() throws MusicLockingException {
103         logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
104         long start = System.currentTimeMillis();
105
106         if (mLockHandle == null) {
107             try {
108                 mLockHandle = new CassaLockStore(MusicDataStoreHandle.getDSHandle());
109             } catch (Exception e) {
110                 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
111                 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
112             }
113         }
114         long end = System.currentTimeMillis();
115         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
116         return mLockHandle;
117     }
118
119     public String createLockReferenceAtomic(String fullyQualifiedKey) throws MusicLockingException {
120         return createLockReferenceAtomic(fullyQualifiedKey, LockType.WRITE);
121     }
122     public String createLockReference(String fullyQualifiedKey, String owner) throws MusicLockingException {
123         return createLockReference(fullyQualifiedKey, LockType.WRITE, owner);
124     }
125     
126     
127     /**
128      * This will be called for Atomic calls
129      * it ensures that only one thread tries to create a lock on each key at a time
130      */
131     public String createLockReferenceAtomic(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
132         String[] splitString = fullyQualifiedKey.split("\\.");
133         if (splitString.length < 3) {
134             throw new MusicLockingException("Missing or incorrect lock details. Check table or key name.");
135         }
136         String keyspace = splitString[0];
137         String table = splitString[1];
138         String lockName = splitString[2];
139
140         logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
141         long start = 0L;
142         long end = 0L;
143         String lockReference = null;
144         LockObject peek = null;
145
146         /** Lets check for an existing lock. 
147          * This will allow us to limit the amount of requests going forward.
148          */
149         start = System.currentTimeMillis();
150         try {
151             peek = getLockingServiceHandle().peekLockQueue(keyspace, table, lockName);
152         } catch (MusicServiceException | MusicQueryException e) {
153             //logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(),e);
154             throw new MusicLockingException("Error getting lockholder info for key [" + lockName +"]:" + e.getMessage());
155         }
156         
157         if(peek!=null && (peek.getLocktype()!=null && peek.getLocktype().equals(LockType.WRITE)) && peek.getAcquireTime()!=null && peek.getLockRef()!=null) {
158             long currentTime = System.currentTimeMillis();
159             if((currentTime-Long.parseLong(peek.getAcquireTime()))<MusicUtil.getDefaultLockLeasePeriod()){
160                 //logger.info(EELFLoggerDelegate.applicationLogger,"Lock holder exists and lease not expired. Please try again for key="+lockName);
161                 throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. Please try again.");
162             }
163         }
164         end = System.currentTimeMillis();
165         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to check for lock reference for key [" + lockName + "]:" + (end - start) + " ms");
166
167         start = System.currentTimeMillis();
168         /* We are Creating a Thread safe set and adding the key to the set. 
169         * if a key exists then it wil be passed over and not go to the lock creation. 
170         * If a key doesn't exist then it will set the value in the set and continue to create a lock. 
171         *
172         * This will ensure that no 2 threads using the same key will be able to try to create a lock
173         * This wil in turn squash the amout of LWT Chatter in Cassandra an reduce the amount of
174         * WriteTimeoutExceptions being experiences on single keys.
175         */
176         if ( set.add(fullyQualifiedKey)) {
177             try {
178                 lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype,null);
179                 set.remove(fullyQualifiedKey);
180             } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
181                 set.remove(fullyQualifiedKey);
182                 throw new MusicLockingException(e.getMessage());
183             } catch (Exception e) {
184                 set.remove(fullyQualifiedKey);
185                 e.printStackTrace();
186                 logger.error(EELFLoggerDelegate.applicationLogger,"Exception in creatLockEnforced:"+ e.getMessage(),e);
187                 throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. " + e.getMessage());
188             }
189         } else {
190             throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. Please try again.");
191         }
192         end = System.currentTimeMillis();
193         logger.info(EELFLoggerDelegate.debugLogger,"### Set = " + set);
194         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference  for key [" + lockName + "]:" + (end - start) + " ms");
195         return lockReference;
196
197         //return createLockReference(fullyQualifiedKey, locktype, null);
198     }
199
200     public String createLockReference(String fullyQualifiedKey, LockType locktype, String owner) throws MusicLockingException {
201         String[] splitString = fullyQualifiedKey.split("\\.");
202         if (splitString.length < 3) {
203             throw new MusicLockingException("Missing or incorrect lock details. Check table or key name.");
204         }
205         String keyspace = splitString[0];
206         String table = splitString[1];
207         String lockName = splitString[2];
208
209         logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
210         long start = 0L;
211         long end = 0L;
212         String lockReference = null;
213
214         /* Check for a Deadlock */
215         try {
216             boolean deadlock = getLockingServiceHandle().checkForDeadlock(keyspace, table, lockName, locktype, owner, false);
217             if (deadlock) {
218                 MusicDeadlockException e = new MusicDeadlockException("Deadlock detected when " + owner + " tried to create lock on " + keyspace + "." + table + "." + lockName);
219                 e.setValues(owner, keyspace, table, lockName);
220                 throw e;
221             }
222         } catch (MusicDeadlockException e) {
223             //just threw this, no need to wrap it
224             throw e;
225         } catch (MusicServiceException | MusicQueryException e) {
226             logger.error(EELFLoggerDelegate.applicationLogger, e);
227             throw new MusicLockingException("Unable to check for deadlock. " + e.getMessage(), e);
228         }
229         end = System.currentTimeMillis();
230         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to check for deadlock for key [" + lockName + "]:" + (end - start) + " ms");
231
232         start = System.currentTimeMillis();
233         try {
234             lockReference = "" + getLockingServiceHandle().genLockRefandEnQueue(keyspace, table, lockName, locktype, owner);
235         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
236             logger.info(EELFLoggerDelegate.applicationLogger,e.getMessage(),e);
237             throw new MusicLockingException("Unable to create lock reference for key [" + lockName + "]. Please try again: " + e.getMessage());
238         } catch (Exception e) {
239             logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(),e);
240             throw new MusicLockingException("Unable to create lock reference. " + e.getMessage(), e);
241         }
242         end = System.currentTimeMillis();
243         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference  for key [" + lockName + "]:" + (end - start) + " ms");
244         return lockReference;
245     }
246     
247     public ReturnType promoteLock(String lockId) throws MusicLockingException {
248         String[] splitString = lockId.split("\\.");
249         String keyspace = splitString[0].substring(1);//remove '$'
250         String table = splitString[1];
251         String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
252         String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
253         
254         logger.info(EELFLoggerDelegate.applicationLogger,"Attempting to promote lock " + lockId);
255
256         try {
257             return getLockingServiceHandle().promoteLock(keyspace, table, primaryKeyValue, lockRef);
258         } catch (MusicServiceException e) {
259             throw new MusicLockingException("Unable to promote lock. ", e);
260         } catch (MusicQueryException e) {
261             throw new MusicLockingException("Unable to promote lock. ", e);
262         }
263         
264     }
265
266
267     public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod)
268             throws MusicLockingException, MusicQueryException, MusicServiceException  {
269         evictExpiredLockHolder(fullyQualifiedKey,leasePeriod);
270         return acquireLock(fullyQualifiedKey, lockReference);
271     }
272
273     private void evictExpiredLockHolder(String fullyQualifiedKey, long leasePeriod)
274             throws MusicLockingException, MusicQueryException, MusicServiceException {
275         String[] splitString = fullyQualifiedKey.split("\\.");
276         String keyspace = splitString[0];
277         String table = splitString[1];
278         String primaryKeyValue = splitString[2];
279
280         LockObject currentLockHolderObject = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
281
282         if (!currentLockHolderObject.getIsLockOwner()) { // no lock holder
283             return;
284         }
285         /*
286          * Release the lock of the previous holder if it has expired. if the update to the acquire time has
287          * not reached due to network delays, simply use the create time as the reference
288          */
289         long referenceTime = Math.max(Long.parseLong(currentLockHolderObject.getAcquireTime()),
290                 Long.parseLong(currentLockHolderObject.getCreateTime()));
291         if ((System.currentTimeMillis() - referenceTime) > leasePeriod) {
292             forciblyReleaseLock(fullyQualifiedKey, currentLockHolderObject.getLockRef() + "");
293             logger.info(EELFLoggerDelegate.applicationLogger, currentLockHolderObject.getLockRef() + " forcibly released");
294         }
295     }
296
297     public ReturnType acquireLock(String fullyQualifiedKey, String lockId)
298             throws MusicLockingException, MusicQueryException, MusicServiceException {
299         String[] splitString = lockId.split("\\.");
300         String keyspace = splitString[0].substring(1);//remove '$'
301         String table = splitString[1];
302         String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
303         String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
304         String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
305
306         LockObject lockInfo = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue, lockRef);
307
308         if (!lockInfo.getIsLockOwner()) {
309             return new ReturnType(ResultType.FAILURE, lockId + " is not a lock holder");//not top of the lock store q
310         }
311         
312         if (getLockingServiceHandle().checkForDeadlock(keyspace, table, primaryKeyValue, lockInfo.getLocktype(), lockInfo.getOwner(), true)) {
313             MusicDeadlockException e = new MusicDeadlockException("Deadlock detected when " + lockInfo.getOwner()  + " tried to create lock on " + keyspace + "." + table + "." + primaryKeyValue);
314             e.setValues(lockInfo.getOwner(), keyspace, table, primaryKeyValue);
315             throw e;
316         }
317
318         //check to see if the value of the key has to be synced in case there was a forceful release
319         String syncTable = keyspace+".unsyncedKeys_"+table;
320         String query = "select * from "+syncTable+" where key='"+localFullyQualifiedKey+"';";
321         PreparedQueryObject readQueryObject = new PreparedQueryObject();
322         readQueryObject.appendQueryString(query);
323         ResultSet results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(readQueryObject);
324         if (!results.all().isEmpty()) {
325             logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
326             try {
327                 syncQuorum(keyspace, table, primaryKeyValue);
328             } catch (Exception e) {
329                 StringWriter sw = new StringWriter();
330                     logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",
331                         ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
332                 String exceptionAsString = sw.toString();
333                 return new ReturnType(ResultType.FAILURE, "Exception thrown while syncing key:\n" + exceptionAsString);
334             }
335             String cleanQuery = "delete from " + syncTable + " where key='"+localFullyQualifiedKey+"';";
336             PreparedQueryObject deleteQueryObject = new PreparedQueryObject();
337             deleteQueryObject.appendQueryString(cleanQuery);
338             MusicDataStoreHandle.getDSHandle().executePut(deleteQueryObject, "critical");
339         }
340
341         getLockingServiceHandle().updateLockAcquireTime(keyspace, table, primaryKeyValue, lockRef);
342
343         return new ReturnType(ResultType.SUCCESS, lockRef+" is the lock holder for the key");
344     }
345
346
347
348     /**
349      *
350      * @param tableQueryObject
351      * @param consistency
352      * @return Boolean Indicates success or failure
353      * @throws MusicServiceException
354      *
355      *
356      */
357     public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject,
358             String consistency) throws MusicServiceException {
359         boolean result = false;
360
361         try {
362             // create shadow locking table
363             result = getLockingServiceHandle().createLockQueue(keyspace, table);
364             if (result == false)
365                 return ResultType.FAILURE;
366
367             result = false;
368
369             // create table to track unsynced_keys
370             table = "unsyncedKeys_" + table;
371
372             String tabQuery =
373                     "CREATE TABLE IF NOT EXISTS " + keyspace + "." + table + " ( key text,PRIMARY KEY (key) );";
374             PreparedQueryObject queryObject = new PreparedQueryObject();
375
376             queryObject.appendQueryString(tabQuery);
377             result = false;
378             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
379
380             // create actual table
381             result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
382         } catch (MusicQueryException | MusicServiceException | MusicLockingException ex) {
383             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR, ErrorSeverity.WARN,
384                     ErrorTypes.MUSICSERVICEERROR);
385             throw new MusicServiceException(ex.getMessage());
386         }
387         return result ? ResultType.SUCCESS : ResultType.FAILURE;
388     }
389
390     private static void syncQuorum(String keyspace, String table, String primaryKeyValue) throws Exception {
391         logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
392         PreparedQueryObject selectQuery = new PreparedQueryObject();
393         PreparedQueryObject updateQuery = new PreparedQueryObject();
394
395         // get the primary key d
396         TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, table);
397         String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName(); // we only support single
398                                                                             // primary key
399         DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
400         Object cqlFormattedPrimaryKeyValue =
401                         MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
402
403         // get the row of data from a quorum
404         selectQuery.appendQueryString("SELECT *  FROM " + keyspace + "." + table + " WHERE "
405                         + primaryKeyName + "= ?" + ";");
406         selectQuery.addValue(cqlFormattedPrimaryKeyValue);
407         MusicUtil.writeBackToQuorum(selectQuery, primaryKeyName, updateQuery, keyspace, table,
408             cqlFormattedPrimaryKeyValue);
409     }
410
411     /**
412      *
413      * @param query
414      * @return ResultSet
415      */
416     public ResultSet quorumGet(PreparedQueryObject query) {
417         ResultSet results = null;
418         try {
419             results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
420         } catch (MusicServiceException | MusicQueryException e) {
421             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR,
422                 ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR, e);
423
424         }
425         return results;
426     }
427
428     public String whoseTurnIsIt(String fullyQualifiedKey) {
429         String[] splitString = fullyQualifiedKey.split("\\.");
430         String keyspace = splitString[0];
431         String table = splitString[1];
432         String primaryKeyValue = splitString[2];
433         try {
434             LockObject lockOwner = getLockingServiceHandle().peekLockQueue(keyspace, table, primaryKeyValue);
435             if (!lockOwner.getIsLockOwner()) {
436                 return null;
437             }
438             return "$" + fullyQualifiedKey + "$" + lockOwner.getLockRef();
439         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
440             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.LOCKINGERROR + fullyQualifiedKey,
441                     ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
442         }
443         return null;
444     }
445     
446     public List<String> getCurrentLockHolders(String fullyQualifiedKey) {
447         String[] splitString = fullyQualifiedKey.split("\\.");
448         String keyspace = splitString[0];
449         String table = splitString[1];
450         String primaryKeyValue = splitString[2];
451         try {
452             return getLockingServiceHandle().getCurrentLockHolders(keyspace, table, primaryKeyValue);
453         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
454             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+fullyQualifiedKey ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
455         }
456         return null;
457     }
458
459     /**
460      *
461      * @param lockReference
462      * @return
463      */
464     public static String getLockNameFromId(String lockReference) {
465         StringTokenizer st = new StringTokenizer(lockReference);
466         return st.nextToken("$");
467     }
468
469     @Override
470     public void destroyLockRef(String lockId) throws MusicLockingException {
471         long start = System.currentTimeMillis();
472         String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
473         String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
474         String[] splitString = fullyQualifiedKey.split("\\.");
475         String keyspace = splitString[0];
476         String table = splitString[1];
477         String primaryKeyValue = splitString[2];
478         try {
479             getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockRef,MusicUtil.getRetryCount());
480         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
481             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockRef,
482                 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR, e);
483             throw new MusicLockingException(e.getMessage());
484         }
485         long end = System.currentTimeMillis();
486         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
487     }
488
489     public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
490         long start = System.currentTimeMillis();
491         String[] splitString = fullyQualifiedKey.split("\\.");
492         String keyspace = splitString[0];
493         String table = splitString[1];
494         String primaryKeyValue = splitString[2];
495         try {
496             getLockingServiceHandle().deQueueLockRef(keyspace, table, primaryKeyValue, lockReference,MusicUtil.getRetryCount());
497         } catch (MusicLockingException | MusicServiceException | MusicQueryException e) {
498             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK + lockReference,
499                 ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR,e);
500             throw new MusicLockingException(e.getMessage());
501         }
502         long end = System.currentTimeMillis();
503         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
504         return new MusicLockState(LockStatus.UNLOCKED, "");
505     }
506
507     @Override
508     public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) throws MusicLockingException {
509         String fullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
510         String lockRef = lockId.substring(lockId.lastIndexOf('$')+1);
511         if (voluntaryRelease) {
512             return voluntaryReleaseLock(fullyQualifiedKey, lockRef);
513         } else {
514             return forciblyReleaseLock(fullyQualifiedKey, lockRef);
515         }
516     }
517
518     public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference)
519             throws MusicLockingException {
520         MusicLockState result = null;
521         try {
522             result = destroyLockRef(fullyQualifiedKey, lockReference);
523         } catch (Exception ex) {
524             logger.info(EELFLoggerDelegate.applicationLogger,
525                     "Exception in voluntaryReleaseLock() for " + fullyQualifiedKey + "ref: " + lockReference);
526             throw new MusicLockingException(ex.getMessage());
527         }
528         return result;
529     }
530
531     public MusicLockState forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException {
532         String[] splitString = fullyQualifiedKey.split("\\.");
533         String keyspace = splitString[0];
534         String table = splitString[1];
535
536         //leave a signal that this key could potentially be unsynchronized
537         String syncTable = keyspace+".unsyncedKeys_"+table;
538         PreparedQueryObject queryObject = new PreparedQueryObject();
539         String values = "(?)";
540         queryObject.addValue(fullyQualifiedKey);
541         String insQuery = "insert into "+syncTable+" (key) values "+values+";";
542         queryObject.appendQueryString(insQuery);
543         try {
544             MusicDataStoreHandle.getDSHandle().executePut(queryObject, "critical");
545         } catch (Exception e) {
546             logger.error("Cannot forcibly release lock: " + fullyQualifiedKey + " " + lockReference + ". "
547                         + e.getMessage(), e);
548         }
549
550         //now release the lock
551         return destroyLockRef(fullyQualifiedKey, lockReference);
552     }
553
554     @Override
555     public List<String> releaseAllLocksForOwner(String ownerId, String keyspace, String table) throws MusicLockingException, MusicServiceException, MusicQueryException {
556 //        System.out.println("IN RELEASEALLLOCKSFOROWNER, ");
557
558         List<String> lockIds = getLockingServiceHandle().getAllLocksForOwner(ownerId, keyspace, table);
559         for (String lockId : lockIds) {
560 //            System.out.println(" LOCKID = " + lockId);
561             //return "$" + keyspace + "." + table + "." + lockName + "$" + String.valueOf(lockRef);
562             releaseLock("$" + keyspace + "." + table + "." + lockId, true);
563         }
564         return lockIds;
565     }
566
567     /**
568      *
569      * @param lockName
570      * @throws MusicLockingException
571      */
572     @Deprecated
573     public  void deleteLock(String lockName) throws MusicLockingException {
574         throw new MusicLockingException("Depreciated Method Delete Lock");
575     }
576
577     // Prepared Query Additions.
578
579     /**
580      *
581      * @param queryObject
582      * @return ReturnType
583      * @throws MusicServiceException
584      */
585     public  ReturnType eventualPut(PreparedQueryObject queryObject) {
586         boolean result = false;
587         try {
588             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
589         } catch (MusicServiceException | MusicQueryException ex) {
590             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get Lock Handle "  ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
591             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + "  " + ex.getCause() + " " + ex);
592             return new ReturnType(ResultType.FAILURE, ex.getMessage());
593         }
594         if (result) {
595             return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
596         } else {
597             return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
598         }
599     }
600
601     /**
602      *
603      * @param queryObject
604      * @return ReturnType
605      * @throws MusicServiceException
606      */
607     public  ReturnType eventualPut_nb(PreparedQueryObject queryObject,String keyspace,String tablename,String primaryKey) {
608         boolean result = false;
609         long guard = 0;
610         PreparedQueryObject getGaurd = new PreparedQueryObject();
611         getGaurd.appendQueryString("SELECT guard FROM "+keyspace+".lockq_"+tablename+ " WHERE key = ? ;");
612         getGaurd.addValue(primaryKey);
613         try {
614             ResultSet getGaurdResult = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(getGaurd);
615             Row row = getGaurdResult.one();
616             if (row != null) {
617                 guard = row.getLong("guard");
618                 long timeOfWrite = System.currentTimeMillis();
619                 long ts = MusicUtil.v2sTimeStampInMicroseconds(guard, timeOfWrite);
620                 String query = queryObject.getQuery();
621                 if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
622                     if (queryObject.getOperation().equalsIgnoreCase("delete"))
623                         query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
624                     else
625                         query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
626                 }
627                 queryObject.replaceQueryString(query);
628             }
629
630         } catch (MusicServiceException | MusicQueryException e) {
631             logger.error(EELFLoggerDelegate.applicationLogger,e.getMessage(), e);
632         }
633         try {
634             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
635         } catch (MusicServiceException | MusicQueryException ex) {
636             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(),"[ERR512E] Failed to get Lock Handle ",
637                 ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
638             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + "  " + ex.getCause() + " ", ex);
639             return new ReturnType(ResultType.FAILURE, ex.getMessage());
640         }
641         if (result) {
642             return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
643         } else {
644             return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
645         }
646     }
647
648     /**
649      *
650      * @param keyspace
651      * @param table
652      * @param primaryKeyValue
653      * @param queryObject
654      * @param lockId
655      * @return
656      */
657     public ReturnType criticalPut(String keyspace, String table, String primaryKeyValue,
658             PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
659         long start = System.currentTimeMillis();
660         try {
661             String keyLock = lockId.substring(lockId.lastIndexOf(".") + 1,lockId.lastIndexOf("$"));
662             if (lockId.contains(".") && !keyLock.equals(primaryKeyValue)) {
663                 return new ReturnType(ResultType.FAILURE,"Lock value '" + keyLock + "' and key value '"
664                 + primaryKeyValue + "' not match. Please check your values: " 
665                 + lockId + " .");
666             }
667             LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
668                     lockId.substring(lockId.lastIndexOf("$") + 1));
669
670             if ( lockObject == null ) {
671                 return new ReturnType(ResultType.FAILURE, lockId + " does not exist.");
672             } else if (!lockObject.getIsLockOwner()) {
673                 return new ReturnType(ResultType.FAILURE, lockId + " is not the lock holder");
674             } else if (lockObject.getLocktype() != LockType.WRITE) {
675                 return new ReturnType(ResultType.FAILURE,
676                         "Attempting to do write operation, but " + lockId + " is a read lock");
677             }
678
679             if (conditionInfo != null) {
680                 try {
681                     if (conditionInfo.testCondition() == false)
682                         return new ReturnType(ResultType.FAILURE, "Lock acquired but the condition is not true");
683                 } catch (Exception e) {
684                     logger.error(EELFLoggerDelegate.errorLogger, e);
685                     return new ReturnType(ResultType.FAILURE,
686                             "Exception thrown while checking the condition, check its sanctity:\n" + e.getMessage());
687                 }
688             }
689             String query = queryObject.getQuery();
690             long timeOfWrite = System.currentTimeMillis();
691             long lockOrdinal = Long.parseLong(lockId.substring(lockId.lastIndexOf("$") + 1));
692             long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
693             // TODO: use Statement instead of modifying query
694             if (!queryObject.getQuery().contains("USING TIMESTAMP")) {
695                 if (queryObject.getOperation().equalsIgnoreCase("delete"))
696                     query = query.replaceFirst("WHERE", " USING TIMESTAMP " + ts + " WHERE ");
697                 else if (queryObject.getOperation().equalsIgnoreCase("insert"))
698                     query = query.replaceFirst(";", " USING TIMESTAMP " + ts + " ; ");
699                 else
700                     query = query.replaceFirst("SET", "USING TIMESTAMP " + ts + " SET");
701             }
702             queryObject.replaceQueryString(query);
703             MusicDataStore dsHandle = MusicDataStoreHandle.getDSHandle();
704             dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
705             long end = System.currentTimeMillis();
706             logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
707         } catch (MusicQueryException | MusicServiceException | MusicLockingException  e) {
708             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), e);
709             return new ReturnType(ResultType.FAILURE,
710                 "Exception thrown while doing the critical put: "
711                 + e.getMessage());
712         }
713         return new ReturnType(ResultType.SUCCESS, "Update performed");
714     }
715
716
717     /**
718      *
719      * @param queryObject
720      * @param consistency
721      * @return Boolean Indicates success or failure
722      * @throws MusicServiceException
723      *
724      *
725      */
726     public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException,MusicQueryException {
727         // this is mainly for some functions like keyspace creation etc which does not
728         // really need the bells and whistles of Music locking.
729         boolean result = false;
730 //        try {
731         result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
732 //        } catch (MusicQueryException | MusicServiceException ex) {
733             // logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR,
734             //     ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR, ex);
735 //            throw new MusicServiceException(ex.getMessage(),ex);
736 //        }
737         return result ? ResultType.SUCCESS : ResultType.FAILURE;
738     }
739
740     /**
741      * This method performs DDL operation on cassandra.
742      *
743      * @param queryObject query object containing prepared query and values
744      * @return ResultSet
745      * @throws MusicServiceException
746      */
747     public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
748         ResultSet results = null;
749         try {
750             results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
751         } catch (MusicQueryException | MusicServiceException e) {
752             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
753             throw new MusicServiceException(e.getMessage());
754         }
755         return results;
756     }
757
758     /**
759      * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
760      * is used to check if the resource is free.
761      *
762      * @param keyspace name of the keyspace
763      * @param table name of the table
764      * @param primaryKeyValue primary key value
765      * @param queryObject query object containing prepared query and values
766      * @param lockId lock ID to check if the resource is free to perform the operation.
767      * @return ResultSet
768      */
769     public ResultSet criticalGet(String keyspace, String table, String primaryKeyValue,
770                     PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
771         ResultSet results = null;
772         String keyLock = lockId.substring(lockId.lastIndexOf(".") + 1,lockId.lastIndexOf("$"));
773         try {
774             if (lockId.contains(".") && !keyLock.equals(primaryKeyValue)) {
775                 throw new MusicLockingException("Lock value '" + keyLock + "' and key value '"
776                 + primaryKeyValue + "' do not match. Please check your values: " 
777                 + lockId + " .");
778             }
779             LockObject lockObject = getLockingServiceHandle().getLockInfo(keyspace, table, primaryKeyValue,
780                 lockId.substring(lockId.lastIndexOf("$") + 1));
781             if (null == lockObject) {
782                 throw new MusicLockingException("No Lock Object. Please check if lock name or key is correct." 
783                     + lockId + " .");
784             }
785             if ( !lockObject.getIsLockOwner()) {
786                 return null;// not top of the lock store q
787             }
788             results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
789         } catch ( MusicLockingException e ) {
790             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity
791                     .WARN, ErrorTypes.MUSICSERVICEERROR);
792                 throw new MusicServiceException(
793                     "Cannot perform critical get for key: " + primaryKeyValue + " : " + e.getMessage());
794         } catch (MusicQueryException | MusicServiceException e) {
795             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity
796                     .WARN, ErrorTypes.MUSICSERVICEERROR, e);
797                 throw new MusicServiceException(
798                     "Cannot perform critical get for key: " + primaryKeyValue + " : " + e.getMessage());    
799         }
800         return results;
801     }
802
803     /**
804      * This method performs DML operation on cassandra, when the lock of the dd is acquired.
805      *
806      * @param keyspaceName name of the keyspace
807      * @param tableName name of the table
808      * @param primaryKey primary key value
809      * @param queryObject query object containing prepared query and values
810      * @return ReturnType
811      * @throws MusicLockingException
812      * @throws MusicServiceException
813      * @throws MusicQueryException
814      */
815     public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
816             PreparedQueryObject queryObject, Condition conditionInfo)
817             throws MusicLockingException, MusicQueryException, MusicServiceException {
818         long start = System.currentTimeMillis();
819         String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
820         String lockId = createLockReferenceAtomic(fullyQualifiedKey, LockType.WRITE);
821         long lockCreationTime = System.currentTimeMillis();
822         ReturnType lockAcqResult = null;
823         logger.info(EELFLoggerDelegate.applicationLogger,
824                 "***Acquiring lock for atomicPut() query : " + queryObject.getQuery() + " : " + primaryKey);
825         logger.info(EELFLoggerDelegate.applicationLogger,
826                 "***Acquiring lock for atomicPut() values: " + queryObject.getValues().toString());
827         if (conditionInfo != null) {
828             logger.info(EELFLoggerDelegate.applicationLogger,
829                     "***Acquiring lock for atomicPut() conditions: " + conditionInfo.toString());
830         }
831         try {
832             lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
833         } catch (MusicLockingException ex) {
834             logger.error(EELFLoggerDelegate.errorLogger,
835                     "Exception while acquireLockWithLease() in atomic put for key: " + primaryKey);
836             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
837             throw new MusicServiceException(
838                     "Cannot perform atomic put for key: " + primaryKey + " : " + ex.getMessage());
839         }
840         long lockAcqTime = System.currentTimeMillis();
841
842         /*
843          * if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) { logger.info(EELFLoggerDelegate.
844          * applicationLogger,"unable to acquire lock, id " + lockId);
845          * voluntaryReleaseLock(fullyQualifiedKey,lockId); return lockAcqResult; }
846          */
847
848         logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
849         String lockRef = lockId.substring(lockId.lastIndexOf("$"));
850         ReturnType criticalPutResult = null;
851         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
852             criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey, queryObject, lockRef, conditionInfo);
853             long criticalPutTime = System.currentTimeMillis();
854             long lockDeleteTime = System.currentTimeMillis();
855             String timingInfo = "|lock creation time:" + (lockCreationTime - start) + "|lock accquire time:"
856                     + (lockAcqTime - lockCreationTime) + "|critical put time:" + (criticalPutTime - lockAcqTime)
857                     + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
858             criticalPutResult.setTimingInfo(timingInfo);
859         } else {
860             logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
861             criticalPutResult = lockAcqResult;
862         }
863         try {
864             voluntaryReleaseLock(fullyQualifiedKey, lockId);
865         } catch (MusicLockingException ex) {
866             logger.info(EELFLoggerDelegate.applicationLogger,
867                     "Exception occured while deleting lock after atomic put for key: " + primaryKey);
868             criticalPutResult.setMessage(criticalPutResult.getMessage() + "Lock release failed");
869         }
870         return criticalPutResult;
871     }
872
873
874
875     /**
876      * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
877      *
878      * @param keyspaceName name of the keyspace
879      * @param tableName name of the table
880      * @param primaryKey primary key value
881      * @param queryObject query object containing prepared query and values
882      * @return ResultSet
883      * @throws MusicServiceException
884      * @throws MusicLockingException
885      * @throws MusicQueryException
886      */
887     public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
888             PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException, MusicQueryException {
889         String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
890         String lockId = createLockReferenceAtomic(fullyQualifiedKey, LockType.READ);
891         ReturnType lockAcqResult = null;
892         ResultSet result = null;
893         logger.info(EELFLoggerDelegate.applicationLogger, "Acquiring lock for atomicGet() : " + queryObject.getQuery());
894         try {
895             lockAcqResult = acquireLockWithLease(fullyQualifiedKey, lockId, MusicUtil.getDefaultLockLeasePeriod());
896         } catch (MusicLockingException ex) {
897             logger.error(EELFLoggerDelegate.errorLogger,
898                     "Exception while acquireLockWithLease() in atomic get for key: " + primaryKey);
899             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage());
900             throw new MusicServiceException(
901                     "Cannot perform atomic get for key: " + primaryKey + " : " + ex.getMessage());
902         }
903         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
904             logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
905             String lockRef = lockId.substring(lockId.lastIndexOf("$"));
906             result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockRef);
907         } else {
908             logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
909         }
910         try {
911             voluntaryReleaseLock(fullyQualifiedKey, lockId);
912         } catch (MusicLockingException ex) {
913             logger.info(EELFLoggerDelegate.applicationLogger,
914                     "Exception occured while deleting lock after atomic put for key: " + primaryKey);
915             throw new MusicLockingException(ex.getMessage());
916         }
917
918         return result;
919     }
920
921
922
923     /**
924      * @param lockName
925      * @return
926      */
927     public Map<String, Object> validateLock(String lockName) {
928         return MusicUtil.validateLock(lockName);
929     }
930
931     @Override
932     @Deprecated
933     public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
934         PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
935         return null;
936     }
937
938     @Override
939     public List<String> getLockQueue(String fullyQualifiedKey)
940         throws MusicServiceException, MusicQueryException, MusicLockingException {
941         String[] splitString = fullyQualifiedKey.split("\\.");
942         String keyspace = splitString[0];
943         String table = splitString[1];
944         String primaryKeyValue = splitString[2];
945
946         return getLockingServiceHandle().getLockQueue(keyspace, table, primaryKeyValue);
947     }
948     @Override
949     public long getLockQueueSize(String fullyQualifiedKey)
950         throws MusicServiceException, MusicQueryException, MusicLockingException {
951         String[] splitString = fullyQualifiedKey.split("\\.");
952         String keyspace = splitString[0];
953         String table = splitString[1];
954         String primaryKeyValue = splitString[2];
955
956         return getLockingServiceHandle().getLockQueueSize(keyspace, table, primaryKeyValue);
957     }
958
959     @Override
960     @Deprecated
961     public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
962             PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
963         //deprecated
964         return null;
965     }
966     
967     //Methods added for ORM changes
968     
969     public ResultType createKeyspace(JsonKeySpace jsonKeySpaceObject,String consistencyInfo) 
970             throws MusicServiceException,MusicQueryException {
971         ResultType result = nonKeyRelatedPut(jsonKeySpaceObject.genCreateKeyspaceQuery(), consistencyInfo);
972         logger.info(EELFLoggerDelegate.applicationLogger, " Keyspace Creation Process completed successfully");
973
974         return result;
975     }
976     
977     public ResultType dropKeyspace(JsonKeySpace jsonKeySpaceObject, String consistencyInfo) 
978             throws MusicServiceException,MusicQueryException {
979         ResultType result = nonKeyRelatedPut(jsonKeySpaceObject.genDropKeyspaceQuery(),
980                     consistencyInfo);
981         logger.info(EELFLoggerDelegate.applicationLogger, " Keyspace deletion Process completed successfully");
982         return result;
983     }
984     
985     public ResultType createTable(JsonTable jsonTableObject, String consistencyInfo) 
986             throws MusicServiceException, MusicQueryException {
987         ResultType result = null;
988         try {
989             result = createTable(jsonTableObject.getKeyspaceName(), 
990                     jsonTableObject.getTableName(), jsonTableObject.genCreateTableQuery(), consistencyInfo);
991             
992         } catch (MusicServiceException ex) {
993             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(), AppMessages.UNKNOWNERROR, ErrorSeverity.WARN,
994                     ErrorTypes.MUSICSERVICEERROR);
995             throw new MusicServiceException(ex.getMessage());
996         }
997         logger.info(EELFLoggerDelegate.applicationLogger, " Table Creation Process completed successfully ");
998         return result;
999     }
1000     
1001     public ResultType dropTable(JsonTable jsonTableObject,String consistencyInfo) 
1002             throws MusicServiceException,MusicQueryException {
1003         ResultType result = nonKeyRelatedPut(jsonTableObject.genDropTableQuery(),
1004                     consistencyInfo);
1005         logger.info(EELFLoggerDelegate.applicationLogger, " Table deletion Process completed successfully ");
1006         
1007         return result;
1008     }
1009     
1010     @Override
1011     public ResultType createIndex(JsonIndex jsonIndexObject, String consistencyInfo) 
1012             throws MusicServiceException, MusicQueryException{
1013         ResultType result = nonKeyRelatedPut(jsonIndexObject.genCreateIndexQuery(),
1014                     consistencyInfo);
1015         
1016         logger.info(EELFLoggerDelegate.applicationLogger, " Index creation Process completed successfully ");
1017         return result;
1018     }
1019     
1020     /**
1021      * This method performs DDL operation on cassandra.
1022      *
1023      * @param queryObject query object containing prepared query and values
1024      * @return ResultSet
1025      * @throws MusicServiceException
1026      */
1027     public  ResultSet select(JsonSelect jsonSelect, MultivaluedMap<String, String> rowParams) 
1028             throws MusicServiceException, MusicQueryException {
1029         ResultSet results = null;
1030         try {
1031             results = get(jsonSelect.genSelectQuery(rowParams));
1032         } catch (MusicServiceException e) {
1033             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
1034             throw new MusicServiceException(e.getMessage());
1035         }
1036         return results;
1037     }
1038     
1039     /**
1040      * Select Critical
1041      */
1042     public ResultSet selectCritical(JsonInsert jsonInsertObj, MultivaluedMap<String, String> rowParams)
1043             throws MusicLockingException, MusicQueryException, MusicServiceException {
1044         
1045         ResultSet results = null;
1046         String consistency = "";
1047         if(null != jsonInsertObj && null != jsonInsertObj.getConsistencyInfo()) {
1048             consistency = jsonInsertObj.getConsistencyInfo().get("type");
1049         }
1050         
1051         String lockId = jsonInsertObj.getConsistencyInfo().get("lockId");
1052         
1053         PreparedQueryObject queryObject = jsonInsertObj.genSelectCriticalPreparedQueryObj(rowParams);
1054         
1055         if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
1056             results = criticalGet(jsonInsertObj.getKeyspaceName(), jsonInsertObj.getTableName(), 
1057                     jsonInsertObj.getPrimaryKeyVal(), queryObject,lockId);
1058         } else if (consistency.equalsIgnoreCase(MusicUtil.ATOMIC)) {
1059             results = atomicGet(jsonInsertObj.getKeyspaceName(), jsonInsertObj.getTableName(),
1060                     jsonInsertObj.getPrimaryKeyVal(), queryObject);
1061         }
1062         
1063         return results;
1064     }
1065     
1066     /**
1067      * this is insert row into Table
1068      */
1069     public ReturnType insertIntoTable(JsonInsert jsonInsertObj)
1070             throws MusicLockingException, MusicQueryException, MusicServiceException {
1071         
1072         String consistency = "";
1073         if(null != jsonInsertObj && null != jsonInsertObj.getConsistencyInfo()) {
1074             consistency = jsonInsertObj.getConsistencyInfo().get("type");
1075         }
1076         
1077         ReturnType result = null;
1078         
1079         try {
1080             PreparedQueryObject queryObj = null;
1081             queryObj = jsonInsertObj.genInsertPreparedQueryObj();
1082             
1083             if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
1084                 result = eventualPut(jsonInsertObj.genInsertPreparedQueryObj());
1085             } else if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
1086                 String lockId = jsonInsertObj.getConsistencyInfo().get("lockId");
1087                 if(lockId == null) {
1088                     logger.error(EELFLoggerDelegate.errorLogger,"LockId cannot be null. Create lock reference or"
1089                             + " use ATOMIC instead of CRITICAL", ErrorSeverity.FATAL, ErrorTypes.MUSICSERVICEERROR);
1090                     return new ReturnType(ResultType.FAILURE, "LockId cannot be null. Create lock "
1091                             + "and acquire lock or use ATOMIC instead of CRITICAL");
1092                 }
1093                 result = criticalPut(jsonInsertObj.getKeyspaceName(), 
1094                         jsonInsertObj.getTableName(), jsonInsertObj.getPrimaryKeyVal(), jsonInsertObj.genInsertPreparedQueryObj(), lockId,null);
1095             } else if (consistency.equalsIgnoreCase(MusicUtil.ATOMIC)) {
1096                 result = atomicPut(jsonInsertObj.getKeyspaceName(), jsonInsertObj.getTableName(), 
1097                         jsonInsertObj.getPrimaryKeyVal(), jsonInsertObj.genInsertPreparedQueryObj(), null);
1098             }
1099         } catch (Exception ex) {
1100             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity
1101                 .WARN, ErrorTypes.MUSICSERVICEERROR, ex);
1102             return new ReturnType(ResultType.FAILURE, ex.getMessage());
1103         }
1104         
1105         return result;
1106     }
1107     
1108      /**
1109      * This is insert row into Table
1110      */
1111     public ReturnType updateTable(JsonUpdate jsonUpdateObj, MultivaluedMap<String, String> rowParams)
1112             throws MusicLockingException, MusicQueryException, MusicServiceException {
1113         
1114         ReturnType result = null;
1115         String consistency = "";
1116         if(null != jsonUpdateObj && null != jsonUpdateObj.getConsistencyInfo()) {
1117             consistency = jsonUpdateObj.getConsistencyInfo().get("type");
1118         }
1119         PreparedQueryObject queryObject = jsonUpdateObj.genUpdatePreparedQueryObj(rowParams);
1120         
1121         Condition conditionInfo;
1122         if (jsonUpdateObj.getConditions() == null) {
1123             conditionInfo = null;
1124         } else {
1125             // to avoid parsing repeatedly, just send the select query to obtain row
1126             PreparedQueryObject selectQuery = new PreparedQueryObject();
1127             selectQuery.appendQueryString("SELECT *  FROM " + jsonUpdateObj.getKeyspaceName() + "." + jsonUpdateObj.getTableName() + " WHERE "
1128                 + jsonUpdateObj.getRowIdString() + ";");
1129             selectQuery.addValue(jsonUpdateObj.getPrimarKeyValue());
1130             conditionInfo = new Condition(jsonUpdateObj.getConditions(), selectQuery);
1131         }
1132
1133         
1134         if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
1135             result = eventualPut(queryObject);
1136         } else if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
1137             String lockId = jsonUpdateObj.getConsistencyInfo().get("lockId");
1138             if(lockId == null) {
1139                 logger.error(EELFLoggerDelegate.errorLogger,"LockId cannot be null. Create lock reference or"
1140                         + " use ATOMIC instead of CRITICAL", ErrorSeverity.FATAL, ErrorTypes.MUSICSERVICEERROR);
1141                
1142                 return new ReturnType(ResultType.FAILURE, "LockId cannot be null. Create lock "
1143                         + "and acquire lock or use ATOMIC instead of CRITICAL");
1144             }
1145             result = criticalPut(jsonUpdateObj.getKeyspaceName(), jsonUpdateObj.getTableName(), jsonUpdateObj.getPrimarKeyValue(),
1146                             queryObject, lockId, conditionInfo);
1147         } else if (consistency.equalsIgnoreCase("atomic_delete_lock")) {
1148             // this function is mainly for the benchmarks
1149             try {
1150                 result = atomicPutWithDeleteLock(jsonUpdateObj.getKeyspaceName(), jsonUpdateObj.getTableName(),
1151                         jsonUpdateObj.getPrimarKeyValue(), queryObject, conditionInfo);
1152             } catch (MusicLockingException e) {
1153                 logger.error(EELFLoggerDelegate.errorLogger,e, AppMessages.UNKNOWNERROR  ,ErrorSeverity.WARN,
1154                     ErrorTypes.GENERALSERVICEERROR, e);
1155                 throw new MusicLockingException(AppMessages.UNKNOWNERROR.toString());
1156                 
1157             }
1158         } else if (consistency.equalsIgnoreCase(MusicUtil.ATOMIC)) {
1159             try {
1160                 result = atomicPut(jsonUpdateObj.getKeyspaceName(), jsonUpdateObj.getTableName(), jsonUpdateObj.getPrimarKeyValue(),
1161                     queryObject, conditionInfo);
1162             } catch (MusicLockingException e) {
1163                 logger.error(EELFLoggerDelegate.errorLogger,e, AppMessages.UNKNOWNERROR  ,ErrorSeverity.WARN, ErrorTypes.GENERALSERVICEERROR, e);
1164                 throw new MusicLockingException(AppMessages.UNKNOWNERROR.toString());
1165             }
1166         } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL_NB)) {
1167             try {
1168                 result = eventualPut_nb(queryObject, jsonUpdateObj.getKeyspaceName(),
1169                         jsonUpdateObj.getTableName(), jsonUpdateObj.getPrimarKeyValue());
1170             }catch (Exception e) {
1171                 return new ReturnType(ResultType.FAILURE, e.getMessage());
1172             }
1173             
1174         }
1175         
1176         return result;
1177     }
1178     
1179     /**
1180      * This method is for Delete From Table
1181      */
1182     public ReturnType deleteFromTable(JsonDelete jsonDeleteObj, MultivaluedMap<String, String> rowParams)
1183             throws MusicLockingException, MusicQueryException, MusicServiceException {
1184         
1185         ReturnType result = null;
1186         String consistency = "";
1187         if(null != jsonDeleteObj && null != jsonDeleteObj.getConsistencyInfo()) {
1188             consistency = jsonDeleteObj.getConsistencyInfo().get("type");
1189         }
1190         PreparedQueryObject queryObject = jsonDeleteObj.genDeletePreparedQueryObj(rowParams);
1191         
1192         // get the conditional, if any
1193         Condition conditionInfo;
1194         if (jsonDeleteObj.getConditions() == null) {
1195             conditionInfo = null;
1196         } else {
1197             // to avoid parsing repeatedly, just send the select query to obtain row
1198             PreparedQueryObject selectQuery = new PreparedQueryObject();
1199             selectQuery.appendQueryString("SELECT *  FROM " + jsonDeleteObj.getKeyspaceName() + "." + jsonDeleteObj.getTableName() + " WHERE "
1200                 + jsonDeleteObj.getRowIdString() + ";");
1201             selectQuery.addValue(jsonDeleteObj.getPrimarKeyValue());
1202             conditionInfo = new Condition(jsonDeleteObj.getConditions(), selectQuery);
1203         }
1204         
1205         if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL))
1206             result = eventualPut(queryObject);
1207         else if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
1208             String lockId = jsonDeleteObj.getConsistencyInfo().get("lockId");
1209             if(lockId == null) {
1210                 logger.error(EELFLoggerDelegate.errorLogger,"LockId cannot be null. Create lock reference or"
1211                     + " use ATOMIC instead of CRITICAL", ErrorSeverity.FATAL, ErrorTypes.MUSICSERVICEERROR);
1212                
1213                 return new ReturnType(ResultType.FAILURE, "LockId cannot be null. Create lock "
1214                         + "and acquire lock or use ATOMIC instead of CRITICAL");
1215             }
1216             result = criticalPut(jsonDeleteObj.getKeyspaceName(), 
1217                     jsonDeleteObj.getTableName(), jsonDeleteObj.getPrimarKeyValue(),
1218                 queryObject, lockId, conditionInfo);
1219         } else if (consistency.equalsIgnoreCase(MusicUtil.ATOMIC)) {
1220             result = atomicPut(jsonDeleteObj.getKeyspaceName(), 
1221                     jsonDeleteObj.getTableName(), jsonDeleteObj.getPrimarKeyValue(),
1222                 queryObject, conditionInfo);
1223         } else if(consistency.equalsIgnoreCase(MusicUtil.EVENTUAL_NB)) {                    
1224             result = eventualPut_nb(queryObject, jsonDeleteObj.getKeyspaceName(), 
1225                     jsonDeleteObj.getTableName(), jsonDeleteObj.getPrimarKeyValue());
1226         }
1227         
1228         return result;
1229     }
1230
1231
1232 }