Push variuos changes
[music.git] / src / main / java / org / onap / music / service / impl / MusicZKCore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  *
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  *
19  * ============LICENSE_END=============================================
20  * ====================================================================
21  */
22
23 package org.onap.music.service.impl;
24
25
26 import java.io.StringWriter;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.StringTokenizer;
31
32 import javax.ws.rs.core.MediaType;
33
34 import org.apache.commons.jcs.access.CacheAccess;
35 import org.apache.zookeeper.KeeperException;
36 import org.apache.zookeeper.KeeperException.NoNodeException;
37 import org.onap.music.datastore.MusicDataStore;
38 import org.onap.music.datastore.PreparedQueryObject;
39 import org.onap.music.datastore.jsonobjects.JsonKeySpace;
40 import org.onap.music.eelf.logging.EELFLoggerDelegate;
41 import org.onap.music.eelf.logging.format.AppMessages;
42 import org.onap.music.eelf.logging.format.ErrorSeverity;
43 import org.onap.music.eelf.logging.format.ErrorTypes;
44 import org.onap.music.exceptions.MusicLockingException;
45 import org.onap.music.exceptions.MusicQueryException;
46 import org.onap.music.exceptions.MusicServiceException;
47 import org.onap.music.lockingservice.cassandra.MusicLockState;
48 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
49 import org.onap.music.lockingservice.zookeeper.MusicLockingService;
50 import org.onap.music.main.MusicUtil;
51 import org.onap.music.main.ResultType;
52 import org.onap.music.main.ReturnType;
53 import org.onap.music.service.MusicCoreService;
54 import org.onap.music.datastore.*;
55
56 import com.datastax.driver.core.ColumnDefinitions;
57 import com.datastax.driver.core.ColumnDefinitions.Definition;
58 import com.datastax.driver.core.DataType;
59 import com.datastax.driver.core.ResultSet;
60 import com.datastax.driver.core.Row;
61 import com.datastax.driver.core.TableMetadata;
62 import com.sun.jersey.api.client.Client;
63 import com.sun.jersey.api.client.ClientResponse;
64 import com.sun.jersey.api.client.WebResource;
65
66 /**
67  * This class .....
68  *
69  *
70  */
71 public class MusicZKCore implements MusicCoreService {
72
73     public static MusicLockingService mLockHandle = null;
74     private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicZKCore.class);
75     private static MusicZKCore musicZKCoreInstance = null;
76     
77     private MusicZKCore() {
78         
79     }
80     public static MusicZKCore getInstance() {
81         
82         if(musicZKCoreInstance == null) {
83             musicZKCoreInstance = new MusicZKCore();
84         }
85         return musicZKCoreInstance;
86     }
87     
88
89
90
91
92     public static MusicLockingService getLockingServiceHandle() throws MusicLockingException {
93         logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
94         long start = System.currentTimeMillis();
95
96         if (mLockHandle == null) {
97             try {
98                 mLockHandle = new MusicLockingService();
99             } catch (Exception e) {
100                 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
101                 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
102             }
103         }
104         long end = System.currentTimeMillis();
105         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
106         return mLockHandle;
107     }
108
109
110
111     public  String createLockReference(String lockName) {
112         logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
113         long start = System.currentTimeMillis();
114         String lockId = null;
115         try {
116             lockId = getLockingServiceHandle().createLockId("/" + lockName);
117         } catch (MusicLockingException e) {
118             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.CREATELOCK+lockName,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
119
120         }
121         long end = System.currentTimeMillis();
122         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
123         return lockId;
124     }
125
126     /**
127      *
128      * @param key
129      * @return
130      */
131     public static boolean isTableOrKeySpaceLock(String key) {
132         String[] splitString = key.split("\\.");
133         if (splitString.length > 2)
134             return false;
135         else
136             return true;
137     }
138
139     /**
140      *
141      * @param key
142      * @return
143      */
144     public static MusicLockState getMusicLockState(String key) {
145         long start = System.currentTimeMillis();
146         try {
147             String[] splitString = key.split("\\.");
148             String keyspaceName = splitString[0];
149             String tableName = splitString[1];
150             String primaryKey = splitString[2];
151             MusicLockState mls;
152             String lockName = keyspaceName + "." + tableName + "." + primaryKey;
153             mls = getLockingServiceHandle().getLockState(lockName);
154             long end = System.currentTimeMillis();
155             logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to get lock state:" + (end - start) + " ms");
156             return mls;
157         } catch (NullPointerException | MusicLockingException e) {
158             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.INVALIDLOCK,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
159         }
160         return null;
161     }
162
163     public  ReturnType acquireLockWithLease(String key, String lockId, long leasePeriod) {
164         try {
165             long start = System.currentTimeMillis();
166             /* check if the current lock has exceeded its lease and if yes, release that lock */
167             MusicLockState mls = getMusicLockState(key);
168             if (mls != null) {
169                 if (mls.getLockStatus().equals(LockStatus.LOCKED)) {
170                     logger.info(EELFLoggerDelegate.applicationLogger,"The current lock holder for " + key + " is " + mls.getLockHolder()
171                                     + ". Checking if it has exceeded lease");
172                     long currentLockPeriod = System.currentTimeMillis() - mls.getLeaseStartTime();
173                     long currentLeasePeriod = mls.getLeasePeriod();
174                     if (currentLockPeriod > currentLeasePeriod) {
175                         logger.info(EELFLoggerDelegate.applicationLogger,"Lock period " + currentLockPeriod
176                                         + " has exceeded lease period " + currentLeasePeriod);
177                         boolean voluntaryRelease = false;
178                         String currentLockHolder = mls.getLockHolder();
179                         mls = releaseLock(currentLockHolder, voluntaryRelease);
180                     }
181                 }
182             } else
183                 logger.error(EELFLoggerDelegate.errorLogger,key, AppMessages.INVALIDLOCK,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
184
185             /*
186              * call the traditional acquire lock now and if the result returned is true, set the
187              * begin time-stamp and lease period
188              */
189             if (acquireLock(key, lockId).getResult() == ResultType.SUCCESS) {
190                 mls = getMusicLockState(key);// get latest state
191                 if ( mls == null ) {
192                     logger.info(EELFLoggerDelegate.applicationLogger,"Music Lock State is null");
193                     return new ReturnType(ResultType.FAILURE, "Could not acquire lock, Lock State is null");
194                 }
195                 if (mls.getLeaseStartTime() == -1) {// set it again only if it is not set already
196                     mls.setLeaseStartTime(System.currentTimeMillis());
197                     mls.setLeasePeriod(leasePeriod);
198                     getLockingServiceHandle().setLockState(key, mls);
199                 }
200                 long end = System.currentTimeMillis();
201                 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire leased lock:" + (end - start) + " ms");
202                 return new ReturnType(ResultType.SUCCESS, "Accquired lock");
203             } else {
204                 long end = System.currentTimeMillis();
205                 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to fail to acquire leased lock:" + (end - start) + " ms");
206                 return new ReturnType(ResultType.FAILURE, "Could not acquire lock");
207             }
208         } catch (Exception e) {
209             StringWriter sw = new StringWriter();
210                logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
211
212             String exceptionAsString = sw.toString();
213             return new ReturnType(ResultType.FAILURE,
214                             "Exception thrown in acquireLockWithLease:\n" + exceptionAsString);
215         }
216     }
217
218     public  ReturnType acquireLock(String key, String lockId) throws MusicLockingException {
219         /*
220          * first check if I am on top. Since ids are not reusable there is no need to check
221          * lockStatus If the status is unlocked, then the above call will automatically return
222          * false.
223          */
224         Boolean result = false;
225         try {
226             result = getLockingServiceHandle().isMyTurn(lockId);
227         } catch (MusicLockingException e2) {
228             logger.error(EELFLoggerDelegate.errorLogger,AppMessages.INVALIDLOCK + lockId + " " + e2);
229             throw new MusicLockingException();
230         }
231         if (!result) {
232             logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Not your turn, someone else has the lock");
233             try {
234                 if (!getLockingServiceHandle().lockIdExists(lockId)) {
235                     logger.info(EELFLoggerDelegate.applicationLogger, "In acquire lock: this lockId doesn't exist");
236                     return new ReturnType(ResultType.FAILURE, "Lockid doesn't exist");
237                 }
238             } catch (MusicLockingException e) {
239                 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.INVALIDLOCK+lockId,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
240                  throw new MusicLockingException();
241             }
242             logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: returning failure");
243             return new ReturnType(ResultType.FAILURE, "Not your turn, someone else has the lock");
244         }
245
246
247         // this is for backward compatibility where locks could also be acquired on just
248         // keyspaces or tables.
249         if (isTableOrKeySpaceLock(key)) {
250             logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: A table or keyspace lock so no need to perform sync...so returning true");
251             return new ReturnType(ResultType.SUCCESS, "A table or keyspace lock so no need to perform sync...so returning true");
252         }
253
254         // read the lock name corresponding to the key and if the status is locked or being locked,
255         // then return false
256         MusicLockState currentMls = null;
257         MusicLockState newMls = null;
258         try {
259             currentMls = getMusicLockState(key);
260             String currentLockHolder = null;
261             if(currentMls != null) { currentLockHolder = currentMls.getLockHolder(); };
262             if (lockId.equals(currentLockHolder)) {
263                 logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: You already have the lock!");
264                 return new ReturnType(ResultType.SUCCESS, "You already have the lock!");
265             }
266         } catch (NullPointerException e) {
267             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.INVALIDLOCK+lockId,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
268         }
269
270         // change status to "being locked". This state transition is necessary to ensure syncing
271         // before granting the lock
272         String lockHolder = null;
273         boolean needToSyncQuorum = false;
274         if (currentMls != null)
275             needToSyncQuorum = currentMls.isNeedToSyncQuorum();
276
277
278         newMls = new MusicLockState(MusicLockState.LockStatus.BEING_LOCKED, lockHolder,
279                         needToSyncQuorum);
280         try {
281             getLockingServiceHandle().setLockState(key, newMls);
282         } catch (MusicLockingException e1) {
283             logger.error(EELFLoggerDelegate.errorLogger,e1.getMessage(), AppMessages.LOCKSTATE+key,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
284         }
285         logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Set lock state to being_locked");
286
287         // do syncing if this was a forced lock release
288         if (needToSyncQuorum) {
289             logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Since there was a forcible release, need to sync quorum!");
290             try {
291               syncQuorum(key);
292             } catch (Exception e) {
293               logger.error(EELFLoggerDelegate.errorLogger,"Failed to set Lock state " + e);
294             }
295         }
296
297         // change status to locked
298         lockHolder = lockId;
299         needToSyncQuorum = false;
300         newMls = new MusicLockState(MusicLockState.LockStatus.LOCKED, lockHolder, needToSyncQuorum);
301         try {
302             getLockingServiceHandle().setLockState(key, newMls);
303         } catch (MusicLockingException e) {
304             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKSTATE+key,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
305         }
306         logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Set lock state to locked and assigned current lock ref "
307                         + lockId + " as holder");
308
309         return new ReturnType(result?ResultType.SUCCESS:ResultType.FAILURE, "Set lock state to locked and assigned a lock holder");
310     }
311
312
313
314     /**
315      *
316      * @param keyspaceName
317      * @param kspObject
318      * @return
319      * @throws Exception
320      */
321     public boolean createKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception {
322         return true;
323     }
324
325
326     private static void syncQuorum(String key) throws Exception {
327         logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
328         String[] splitString = key.split("\\.");
329         String keyspaceName = splitString[0];
330         String tableName = splitString[1];
331         String primaryKeyValue = splitString[2];
332         PreparedQueryObject selectQuery = new PreparedQueryObject();
333         PreparedQueryObject updateQuery = new PreparedQueryObject();
334
335         // get the primary key d
336         TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspaceName, tableName);
337         String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName();// we only support single
338                                                                            // primary key
339         DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
340         Object cqlFormattedPrimaryKeyValue =
341                         MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
342
343         // get the row of data from a quorum
344         selectQuery.appendQueryString("SELECT *  FROM " + keyspaceName + "." + tableName + " WHERE "
345                         + primaryKeyName + "= ?" + ";");
346         selectQuery.addValue(cqlFormattedPrimaryKeyValue);
347         ResultSet results = null;
348         try {
349             results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(selectQuery);
350             // write it back to a quorum
351             Row row = results.one();
352             ColumnDefinitions colInfo = row.getColumnDefinitions();
353             int totalColumns = colInfo.size();
354             int counter = 1;
355             StringBuilder fieldValueString = new StringBuilder("");
356             for (Definition definition : colInfo) {
357                 String colName = definition.getName();
358                 if (colName.equals(primaryKeyName))
359                     continue;
360                 DataType colType = definition.getType();
361                 Object valueObj = MusicDataStoreHandle.getDSHandle().getColValue(row, colName, colType);
362                 Object valueString = MusicUtil.convertToActualDataType(colType, valueObj);
363                 fieldValueString.append(colName + " = ?");
364                 updateQuery.addValue(valueString);
365                 if (counter != (totalColumns - 1))
366                     fieldValueString.append(",");
367                 counter = counter + 1;
368             }
369             updateQuery.appendQueryString("UPDATE " + keyspaceName + "." + tableName + " SET "
370                             + fieldValueString + " WHERE " + primaryKeyName + "= ? " + ";");
371             updateQuery.addValue(cqlFormattedPrimaryKeyValue);
372
373             MusicDataStoreHandle.getDSHandle().executePut(updateQuery, "critical");
374         } catch (MusicServiceException | MusicQueryException e) {
375             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.QUERYERROR +""+updateQuery ,ErrorSeverity.MAJOR, ErrorTypes.QUERYERROR);
376         }
377     }
378
379
380
381
382     /**
383      *
384      * @param query
385      * @return ResultSet
386      */
387     public  ResultSet quorumGet(PreparedQueryObject query) {
388         ResultSet results = null;
389         try {
390             results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
391         } catch (MusicServiceException | MusicQueryException e) {
392             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR);
393
394         }
395         return results;
396
397     }
398
399
400
401     /**
402      *
403      * @param lockName
404      * @return
405      */
406     public  String whoseTurnIsIt(String lockName) {
407
408         try {
409             return getLockingServiceHandle().whoseTurnIsIt("/" + lockName) + "";
410         } catch (MusicLockingException e) {
411              logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+lockName ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
412         }
413         return null;
414
415
416     }
417
418     /**
419      *
420      * @param lockId
421      * @return
422      */
423     public static String getLockNameFromId(String lockId) {
424         StringTokenizer st = new StringTokenizer(lockId);
425         return st.nextToken("$");
426     }
427
428     public void destroyLockRef(String lockId) {
429         long start = System.currentTimeMillis();
430         try {
431             getLockingServiceHandle().unlockAndDeleteId(lockId);
432         } catch (MusicLockingException | NoNodeException e) {
433             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockId  ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
434         }
435         long end = System.currentTimeMillis();
436         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
437     }
438
439     public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) {
440         long start = System.currentTimeMillis();
441         try {
442             getLockingServiceHandle().unlockAndDeleteId(lockId);
443         } catch (MusicLockingException e1) {
444             logger.error(EELFLoggerDelegate.errorLogger,e1.getMessage(), AppMessages.RELEASELOCK+lockId  ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
445         } catch (KeeperException.NoNodeException nne) {
446             logger.error(EELFLoggerDelegate.errorLogger,"Failed to release Lock " + lockId + " " + nne);
447             MusicLockState mls = new MusicLockState("Lock doesn't exists. Release lock operation failed.");
448             return mls;
449         }
450         String lockName = getLockNameFromId(lockId);
451         MusicLockState mls;
452         String lockHolder = null;
453         if (voluntaryRelease) {
454             mls = new MusicLockState(MusicLockState.LockStatus.UNLOCKED, lockHolder);
455             logger.info(EELFLoggerDelegate.applicationLogger,"In unlock: lock voluntarily released for " + lockId);
456         } else {
457             boolean needToSyncQuorum = true;
458             mls = new MusicLockState(MusicLockState.LockStatus.UNLOCKED, lockHolder,
459                             needToSyncQuorum);
460             logger.info(EELFLoggerDelegate.applicationLogger,"In unlock: lock forcibly released for " + lockId);
461         }
462         try {
463             getLockingServiceHandle().setLockState(lockName, mls);
464         } catch (MusicLockingException e) {
465             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.RELEASELOCK+lockId  ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
466         }
467         long end = System.currentTimeMillis();
468         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to release lock:" + (end - start) + " ms");
469         return mls;
470     }
471
472     public static  void  voluntaryReleaseLock(String lockId) throws MusicLockingException{
473         try {
474             getLockingServiceHandle().unlockAndDeleteId(lockId);
475         } catch (KeeperException.NoNodeException e) {
476             // ??? No way
477         }
478     }
479
480     /**
481      *
482      * @param lockName
483      * @throws MusicLockingException
484      */
485     public  void deleteLock(String lockName) throws MusicLockingException {
486         long start = System.currentTimeMillis();
487         logger.info(EELFLoggerDelegate.applicationLogger,"Deleting lock for " + lockName);
488         try {
489             getLockingServiceHandle().deleteLock("/" + lockName);
490         } catch (MusicLockingException e) {
491              logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DELTELOCK+lockName  ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
492              throw new MusicLockingException(e.getMessage());
493         }
494         long end = System.currentTimeMillis();
495         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to delete lock:" + (end - start) + " ms");
496     }
497
498
499     /**
500      *
501      * @param nodeName
502      */
503     public static void pureZkCreate(String nodeName) {
504         try {
505             getLockingServiceHandle().getzkLockHandle().createNode(nodeName);
506         } catch (MusicLockingException e) {
507              logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR512E] Failed to get ZK Lock Handle "  ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
508         }
509     }
510
511     /**
512      *
513      * @param nodeName
514      * @param data
515      */
516     public static void pureZkWrite(String nodeName, byte[] data) {
517         long start = System.currentTimeMillis();
518         logger.info(EELFLoggerDelegate.applicationLogger,"Performing zookeeper write to " + nodeName);
519         try {
520             getLockingServiceHandle().getzkLockHandle().setNodeData(nodeName, data);
521         } catch (MusicLockingException e) {
522             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR512E] Failed to get ZK Lock Handle "  ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
523         }
524         logger.info(EELFLoggerDelegate.applicationLogger,"Performed zookeeper write to " + nodeName);
525         long end = System.currentTimeMillis();
526         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the actual zk put:" + (end - start) + " ms");
527     }
528
529     /**
530      *
531      * @param nodeName
532      * @return
533      */
534     public static byte[] pureZkRead(String nodeName) {
535         long start = System.currentTimeMillis();
536         byte[] data = null;
537         try {
538             data = getLockingServiceHandle().getzkLockHandle().getNodeData(nodeName);
539         } catch (MusicLockingException e) {
540             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR512E] Failed to get ZK Lock Handle "  ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
541         }
542         long end = System.currentTimeMillis();
543         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the actual zk put:" + (end - start) + " ms");
544         return data;
545     }
546
547
548
549     // Prepared Query Additions.
550
551     /**
552      *
553      * @param keyspaceName
554      * @param tableName
555      * @param primaryKey
556      * @param queryObject
557      * @return ReturnType
558      * @throws MusicServiceException
559      */
560     public  ReturnType eventualPut(PreparedQueryObject queryObject) {
561         boolean result = false;
562         try {
563             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
564         } catch (MusicServiceException | MusicQueryException ex) {
565             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle "  ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
566             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + "  " + ex.getCause() + " " + ex);
567             return new ReturnType(ResultType.FAILURE, ex.getMessage());
568         }
569         if (result) {
570             return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
571         } else {
572             return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
573         }
574     }
575
576     /**
577      *
578      * @param keyspaceName
579      * @param tableName
580      * @param primaryKey
581      * @param queryObject
582      * @param lockId
583      * @return
584      */
585     public  ReturnType criticalPut(String keyspaceName, String tableName, String primaryKey,
586                     PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
587         long start = System.currentTimeMillis();
588
589         try {
590             MusicLockState mls = getLockingServiceHandle()
591                             .getLockState(keyspaceName + "." + tableName + "." + primaryKey);
592             if (mls.getLockHolder().equals(lockId) == true) {
593                 if (conditionInfo != null)
594                   try {
595                     if (conditionInfo.testCondition() == false)
596                         return new ReturnType(ResultType.FAILURE,
597                                         "Lock acquired but the condition is not true");
598                   } catch (Exception e) {
599                     return new ReturnType(ResultType.FAILURE,
600                             "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
601                                             + e.getMessage());
602                   }
603                 boolean result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.CRITICAL);
604                 long end = System.currentTimeMillis();
605                 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
606                 if (result) {
607                     return new ReturnType(ResultType.SUCCESS, "Update performed");
608                 } else {
609                     return new ReturnType(ResultType.FAILURE, "Update failed to perform");
610                 }
611             } else
612                 return new ReturnType(ResultType.FAILURE,
613                                 "Cannot perform operation since you are the not the lock holder");
614         } catch (MusicQueryException | MusicServiceException  e) {
615             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
616             return new ReturnType(ResultType.FAILURE,
617                             "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
618                                             + e.getMessage());
619         }catch(MusicLockingException ex){
620             return new ReturnType(ResultType.FAILURE,ex.getMessage());
621         }
622
623     }
624
625     /**
626      *
627      * @param queryObject
628      * @param consistency
629      * @return Boolean Indicates success or failure
630      * @throws MusicServiceException
631      *
632      *
633      */
634     public  ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException {
635         // this is mainly for some functions like keyspace creation etc which does not
636         // really need the bells and whistles of Music locking.
637         boolean result = false;
638         try {
639             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
640         } catch (MusicQueryException | MusicServiceException ex) {
641             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
642             throw new MusicServiceException(ex.getMessage());
643         }
644         return result?ResultType.SUCCESS:ResultType.FAILURE;
645     }
646
647     /**
648      * This method performs DDL operation on cassandra.
649      *
650      * @param queryObject query object containing prepared query and values
651      * @return ResultSet
652      * @throws MusicServiceException
653      */
654     public  ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
655         ResultSet results = null;
656         try {
657             results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
658         } catch (MusicQueryException | MusicServiceException e) {
659             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
660             throw new MusicServiceException(e.getMessage());
661         }
662         return results;
663     }
664
665     public static String getMyHostId() {
666         PreparedQueryObject pQuery = new PreparedQueryObject();
667         pQuery.appendQueryString("SELECT HOST_ID FROM SYSTEM.LOCAL");
668         ResultSet rs = null;
669         try {
670             rs = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(pQuery);
671             Row row = rs.one();
672             return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString();
673         } catch (Exception e) {
674             e.printStackTrace();
675             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
676         }
677         logger.error(EELFLoggerDelegate.errorLogger, "Some issue during MusicCore.getMyHostId");
678         return "UNKNOW";
679     }
680
681     /**
682      * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
683      * is used to check if the resource is free.
684      *
685      * @param keyspaceName name of the keyspace
686      * @param tableName name of the table
687      * @param primaryKey primary key value
688      * @param queryObject query object containing prepared query and values
689      * @param lockId lock ID to check if the resource is free to perform the operation.
690      * @return ResultSet
691      */
692     public  ResultSet criticalGet(String keyspaceName, String tableName, String primaryKey,
693                     PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
694         ResultSet results = null;
695         try {
696             MusicLockState mls = getLockingServiceHandle()
697                             .getLockState(keyspaceName + "." + tableName + "." + primaryKey);
698             if (mls.getLockHolder().equals(lockId)) {
699                 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
700             } else
701                 throw new MusicServiceException("YOU DO NOT HAVE THE LOCK");
702         } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
703             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
704         }
705         return results;
706     }
707
708     /**
709      * This method performs DML operation on cassandra, when the lock of the dd is acquired.
710      *
711      * @param keyspaceName name of the keyspace
712      * @param tableName name of the table
713      * @param primaryKey primary key value
714      * @param queryObject query object containing prepared query and values
715      * @return ReturnType
716      * @throws MusicLockingException
717      */
718     public  ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
719                     PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
720
721         long start = System.currentTimeMillis();
722         String key = keyspaceName + "." + tableName + "." + primaryKey;
723         String lockId = createLockReference(key);
724         long lockCreationTime = System.currentTimeMillis();
725         ReturnType lockAcqResult = acquireLock(key, lockId);
726         long lockAcqTime = System.currentTimeMillis();
727         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
728             logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
729             ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
730                             queryObject, lockId, conditionInfo);
731             long criticalPutTime = System.currentTimeMillis();
732             voluntaryReleaseLock(lockId);
733             long lockDeleteTime = System.currentTimeMillis();
734             String timingInfo = "|lock creation time:" + (lockCreationTime - start)
735                             + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
736                             + "|critical put time:" + (criticalPutTime - lockAcqTime)
737                             + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
738             criticalPutResult.setTimingInfo(timingInfo);
739             return criticalPutResult;
740         } else {
741             logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
742             destroyLockRef(lockId);
743             return lockAcqResult;
744         }
745     }
746
747     /**
748      * this function is mainly for the benchmarks to see the effect of lock deletion.
749      *
750      * @param keyspaceName
751      * @param tableName
752      * @param primaryKey
753      * @param queryObject
754      * @param conditionInfo
755      * @return
756      * @throws MusicLockingException
757      */
758     public  ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName,
759                     String primaryKey, PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
760
761         long start = System.currentTimeMillis();
762         String key = keyspaceName + "." + tableName + "." + primaryKey;
763         String lockId = createLockReference(key);
764         long lockCreationTime = System.currentTimeMillis();
765         long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
766         ReturnType lockAcqResult = acquireLock(key, lockId);
767         long lockAcqTime = System.currentTimeMillis();
768         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
769             logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
770             ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
771                             queryObject, lockId, conditionInfo);
772             long criticalPutTime = System.currentTimeMillis();
773             deleteLock(key);
774             long lockDeleteTime = System.currentTimeMillis();
775             String timingInfo = "|lock creation time:" + (lockCreationTime - start)
776                             + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
777                             + "|critical put time:" + (criticalPutTime - lockAcqTime)
778                             + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
779             criticalPutResult.setTimingInfo(timingInfo);
780             return criticalPutResult;
781         } else {
782             logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
783             deleteLock(key);
784             return lockAcqResult;
785         }
786     }
787
788
789
790
791     /**
792      * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
793      *
794      * @param keyspaceName name of the keyspace
795      * @param tableName name of the table
796      * @param primaryKey primary key value
797      * @param queryObject query object containing prepared query and values
798      * @return ResultSet
799      * @throws MusicServiceException
800      * @throws MusicLockingException
801      */
802     public  ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
803                     PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
804         String key = keyspaceName + "." + tableName + "." + primaryKey;
805         String lockId = createLockReference(key);
806         long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
807         ReturnType lockAcqResult = acquireLock(key, lockId);
808         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
809             logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
810             ResultSet result =
811                             criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockId);
812             voluntaryReleaseLock(lockId);
813             return result;
814         } else {
815             destroyLockRef(lockId);
816             logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
817             return null;
818         }
819     }
820
821     public  ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
822             PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
823         String key = keyspaceName + "." + tableName + "." + primaryKey;
824         String lockId = createLockReference(key);
825         long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
826
827         ReturnType lockAcqResult = acquireLock(key, lockId);
828
829         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
830             logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
831             ResultSet result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockId);
832             deleteLock(key);
833             return result;
834         } else {
835             deleteLock(key);
836             logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
837             return null;
838         }
839     }
840
841     /**
842      * @param lockName
843      * @return
844      */
845     public Map<String, Object> validateLock(String lockName) {
846         Map<String, Object> resultMap = new HashMap<>();
847         String[] locks = lockName.split("\\.");
848         if(locks.length < 3) {
849             resultMap.put("Error", "Invalid lock. Please make sure lock is of the type keyspaceName.tableName.primaryKey");
850             return resultMap;
851         }
852         String keyspace= locks[0];
853         if(keyspace.startsWith("$"))
854             keyspace = keyspace.substring(1);
855         resultMap.put("keyspace",keyspace);
856         return resultMap;
857     }
858
859
860
861     @Override
862     public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject,
863             String consistency) throws MusicServiceException {
864         boolean result = false;
865         
866         try {
867             //create shadow locking table 
868             result = createLockQueue(keyspace, table);
869             if(result == false) 
870                 return ResultType.FAILURE;
871
872             result = false;
873             
874             //create table to track unsynced_keys
875             table = "unsyncedKeys_"+table; 
876             
877             String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
878                     + " ( key text,PRIMARY KEY (key) );";
879             System.out.println(tabQuery);
880             PreparedQueryObject queryObject = new PreparedQueryObject(); 
881             
882             queryObject.appendQueryString(tabQuery);
883             result = false;
884             result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
885
886         
887             //create actual table
888             result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
889         } catch (MusicQueryException | MusicServiceException ex) {
890             logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
891             throw new MusicServiceException(ex.getMessage());
892         }
893         return result?ResultType.SUCCESS:ResultType.FAILURE;
894     }
895     
896     public static boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
897         logger.info(EELFLoggerDelegate.applicationLogger,
898                 "Create lock queue/table for " +  keyspace+"."+table);
899         table = "lockQ_"+table; 
900         String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
901                 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, PRIMARY KEY ((key), lockReference) ) "
902                 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
903         System.out.println(tabQuery);
904         PreparedQueryObject queryObject = new PreparedQueryObject(); 
905         
906         queryObject.appendQueryString(tabQuery);
907         boolean result;
908         result = MusicDataStoreHandle.mDstoreHandle.executePut(queryObject, "eventual");
909         return result;
910     }
911
912
913     @Override
914     public List<String> getLockQueue(String fullyQualifiedKey)
915             throws MusicServiceException, MusicQueryException, MusicLockingException {
916         // TODO Auto-generated method stub
917         return null;
918     }
919
920
921
922     @Override
923     public long getLockQueueSize(String fullyQualifiedKey)
924             throws MusicServiceException, MusicQueryException, MusicLockingException {
925         // TODO Auto-generated method stub
926         return 0;
927     }
928     @Override
929     public ReturnType eventualPut_nb(PreparedQueryObject queryObject, String keyspace, String tablename,
930             String primaryKey) {
931         return eventualPut(queryObject);
932     }
933
934
935
936
937 }