2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * ============LICENSE_END=============================================
20 * ====================================================================
23 package org.onap.music.service.impl;
26 import java.io.StringWriter;
27 import java.util.HashMap;
28 import java.util.List;
30 import java.util.StringTokenizer;
32 import javax.ws.rs.core.MediaType;
34 import org.apache.commons.jcs.access.CacheAccess;
35 import org.apache.zookeeper.KeeperException;
36 import org.apache.zookeeper.KeeperException.NoNodeException;
37 import org.onap.music.datastore.MusicDataStore;
38 import org.onap.music.datastore.PreparedQueryObject;
39 import org.onap.music.datastore.jsonobjects.JsonKeySpace;
40 import org.onap.music.eelf.logging.EELFLoggerDelegate;
41 import org.onap.music.eelf.logging.format.AppMessages;
42 import org.onap.music.eelf.logging.format.ErrorSeverity;
43 import org.onap.music.eelf.logging.format.ErrorTypes;
44 import org.onap.music.exceptions.MusicLockingException;
45 import org.onap.music.exceptions.MusicQueryException;
46 import org.onap.music.exceptions.MusicServiceException;
47 import org.onap.music.lockingservice.cassandra.MusicLockState;
48 import org.onap.music.lockingservice.cassandra.MusicLockState.LockStatus;
49 import org.onap.music.lockingservice.zookeeper.MusicLockingService;
50 import org.onap.music.main.MusicUtil;
51 import org.onap.music.main.ResultType;
52 import org.onap.music.main.ReturnType;
53 import org.onap.music.service.MusicCoreService;
54 import org.onap.music.datastore.*;
56 import com.datastax.driver.core.ColumnDefinitions;
57 import com.datastax.driver.core.ColumnDefinitions.Definition;
58 import com.datastax.driver.core.DataType;
59 import com.datastax.driver.core.ResultSet;
60 import com.datastax.driver.core.Row;
61 import com.datastax.driver.core.TableMetadata;
62 import com.sun.jersey.api.client.Client;
63 import com.sun.jersey.api.client.ClientResponse;
64 import com.sun.jersey.api.client.WebResource;
71 public class MusicZKCore implements MusicCoreService {
73 public static MusicLockingService mLockHandle = null;
74 private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicZKCore.class);
75 private static MusicZKCore musicZKCoreInstance = null;
77 private MusicZKCore() {
80 public static MusicZKCore getInstance() {
82 if(musicZKCoreInstance == null) {
83 musicZKCoreInstance = new MusicZKCore();
85 return musicZKCoreInstance;
92 public static MusicLockingService getLockingServiceHandle() throws MusicLockingException {
93 logger.info(EELFLoggerDelegate.applicationLogger,"Acquiring lock store handle");
94 long start = System.currentTimeMillis();
96 if (mLockHandle == null) {
98 mLockHandle = new MusicLockingService();
99 } catch (Exception e) {
100 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKHANDLE,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
101 throw new MusicLockingException("Failed to aquire Locl store handle " + e);
104 long end = System.currentTimeMillis();
105 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire lock store handle:" + (end - start) + " ms");
111 public String createLockReference(String lockName) {
112 logger.info(EELFLoggerDelegate.applicationLogger,"Creating lock reference for lock name:" + lockName);
113 long start = System.currentTimeMillis();
114 String lockId = null;
116 lockId = getLockingServiceHandle().createLockId("/" + lockName);
117 } catch (MusicLockingException e) {
118 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.CREATELOCK+lockName,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
121 long end = System.currentTimeMillis();
122 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
131 public static boolean isTableOrKeySpaceLock(String key) {
132 String[] splitString = key.split("\\.");
133 if (splitString.length > 2)
144 public static MusicLockState getMusicLockState(String key) {
145 long start = System.currentTimeMillis();
147 String[] splitString = key.split("\\.");
148 String keyspaceName = splitString[0];
149 String tableName = splitString[1];
150 String primaryKey = splitString[2];
152 String lockName = keyspaceName + "." + tableName + "." + primaryKey;
153 mls = getLockingServiceHandle().getLockState(lockName);
154 long end = System.currentTimeMillis();
155 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to get lock state:" + (end - start) + " ms");
157 } catch (NullPointerException | MusicLockingException e) {
158 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.INVALIDLOCK,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
163 public ReturnType acquireLockWithLease(String key, String lockId, long leasePeriod) {
165 long start = System.currentTimeMillis();
166 /* check if the current lock has exceeded its lease and if yes, release that lock */
167 MusicLockState mls = getMusicLockState(key);
169 if (mls.getLockStatus().equals(LockStatus.LOCKED)) {
170 logger.info(EELFLoggerDelegate.applicationLogger,"The current lock holder for " + key + " is " + mls.getLockHolder()
171 + ". Checking if it has exceeded lease");
172 long currentLockPeriod = System.currentTimeMillis() - mls.getLeaseStartTime();
173 long currentLeasePeriod = mls.getLeasePeriod();
174 if (currentLockPeriod > currentLeasePeriod) {
175 logger.info(EELFLoggerDelegate.applicationLogger,"Lock period " + currentLockPeriod
176 + " has exceeded lease period " + currentLeasePeriod);
177 boolean voluntaryRelease = false;
178 String currentLockHolder = mls.getLockHolder();
179 mls = releaseLock(currentLockHolder, voluntaryRelease);
183 logger.error(EELFLoggerDelegate.errorLogger,key, AppMessages.INVALIDLOCK,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
186 * call the traditional acquire lock now and if the result returned is true, set the
187 * begin time-stamp and lease period
189 if (acquireLock(key, lockId).getResult() == ResultType.SUCCESS) {
190 mls = getMusicLockState(key);// get latest state
192 logger.info(EELFLoggerDelegate.applicationLogger,"Music Lock State is null");
193 return new ReturnType(ResultType.FAILURE, "Could not acquire lock, Lock State is null");
195 if (mls.getLeaseStartTime() == -1) {// set it again only if it is not set already
196 mls.setLeaseStartTime(System.currentTimeMillis());
197 mls.setLeasePeriod(leasePeriod);
198 getLockingServiceHandle().setLockState(key, mls);
200 long end = System.currentTimeMillis();
201 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to acquire leased lock:" + (end - start) + " ms");
202 return new ReturnType(ResultType.SUCCESS, "Accquired lock");
204 long end = System.currentTimeMillis();
205 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to fail to acquire leased lock:" + (end - start) + " ms");
206 return new ReturnType(ResultType.FAILURE, "Could not acquire lock");
208 } catch (Exception e) {
209 StringWriter sw = new StringWriter();
210 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR506E] Failed to aquire lock ",ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
212 String exceptionAsString = sw.toString();
213 return new ReturnType(ResultType.FAILURE,
214 "Exception thrown in acquireLockWithLease:\n" + exceptionAsString);
218 public ReturnType acquireLock(String key, String lockId) throws MusicLockingException {
220 * first check if I am on top. Since ids are not reusable there is no need to check
221 * lockStatus If the status is unlocked, then the above call will automatically return
224 Boolean result = false;
226 result = getLockingServiceHandle().isMyTurn(lockId);
227 } catch (MusicLockingException e2) {
228 logger.error(EELFLoggerDelegate.errorLogger,AppMessages.INVALIDLOCK + lockId + " " + e2);
229 throw new MusicLockingException();
232 logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Not your turn, someone else has the lock");
234 if (!getLockingServiceHandle().lockIdExists(lockId)) {
235 logger.info(EELFLoggerDelegate.applicationLogger, "In acquire lock: this lockId doesn't exist");
236 return new ReturnType(ResultType.FAILURE, "Lockid doesn't exist");
238 } catch (MusicLockingException e) {
239 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.INVALIDLOCK+lockId,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
240 throw new MusicLockingException();
242 logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: returning failure");
243 return new ReturnType(ResultType.FAILURE, "Not your turn, someone else has the lock");
247 // this is for backward compatibility where locks could also be acquired on just
248 // keyspaces or tables.
249 if (isTableOrKeySpaceLock(key)) {
250 logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: A table or keyspace lock so no need to perform sync...so returning true");
251 return new ReturnType(ResultType.SUCCESS, "A table or keyspace lock so no need to perform sync...so returning true");
254 // read the lock name corresponding to the key and if the status is locked or being locked,
256 MusicLockState currentMls = null;
257 MusicLockState newMls = null;
259 currentMls = getMusicLockState(key);
260 String currentLockHolder = null;
261 if(currentMls != null) { currentLockHolder = currentMls.getLockHolder(); };
262 if (lockId.equals(currentLockHolder)) {
263 logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: You already have the lock!");
264 return new ReturnType(ResultType.SUCCESS, "You already have the lock!");
266 } catch (NullPointerException e) {
267 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.INVALIDLOCK+lockId,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
270 // change status to "being locked". This state transition is necessary to ensure syncing
271 // before granting the lock
272 String lockHolder = null;
273 boolean needToSyncQuorum = false;
274 if (currentMls != null)
275 needToSyncQuorum = currentMls.isNeedToSyncQuorum();
278 newMls = new MusicLockState(MusicLockState.LockStatus.BEING_LOCKED, lockHolder,
281 getLockingServiceHandle().setLockState(key, newMls);
282 } catch (MusicLockingException e1) {
283 logger.error(EELFLoggerDelegate.errorLogger,e1.getMessage(), AppMessages.LOCKSTATE+key,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
285 logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Set lock state to being_locked");
287 // do syncing if this was a forced lock release
288 if (needToSyncQuorum) {
289 logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Since there was a forcible release, need to sync quorum!");
292 } catch (Exception e) {
293 logger.error(EELFLoggerDelegate.errorLogger,"Failed to set Lock state " + e);
297 // change status to locked
299 needToSyncQuorum = false;
300 newMls = new MusicLockState(MusicLockState.LockStatus.LOCKED, lockHolder, needToSyncQuorum);
302 getLockingServiceHandle().setLockState(key, newMls);
303 } catch (MusicLockingException e) {
304 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKSTATE+key,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
306 logger.info(EELFLoggerDelegate.applicationLogger,"In acquire lock: Set lock state to locked and assigned current lock ref "
307 + lockId + " as holder");
309 return new ReturnType(result?ResultType.SUCCESS:ResultType.FAILURE, "Set lock state to locked and assigned a lock holder");
316 * @param keyspaceName
321 public boolean createKeyspace(String keyspaceName, JsonKeySpace kspObject) throws Exception {
326 private static void syncQuorum(String key) throws Exception {
327 logger.info(EELFLoggerDelegate.applicationLogger,"Performing sync operation---");
328 String[] splitString = key.split("\\.");
329 String keyspaceName = splitString[0];
330 String tableName = splitString[1];
331 String primaryKeyValue = splitString[2];
332 PreparedQueryObject selectQuery = new PreparedQueryObject();
333 PreparedQueryObject updateQuery = new PreparedQueryObject();
335 // get the primary key d
336 TableMetadata tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspaceName, tableName);
337 String primaryKeyName = tableInfo.getPrimaryKey().get(0).getName();// we only support single
339 DataType primaryKeyType = tableInfo.getPrimaryKey().get(0).getType();
340 Object cqlFormattedPrimaryKeyValue =
341 MusicUtil.convertToActualDataType(primaryKeyType, primaryKeyValue);
343 // get the row of data from a quorum
344 selectQuery.appendQueryString("SELECT * FROM " + keyspaceName + "." + tableName + " WHERE "
345 + primaryKeyName + "= ?" + ";");
346 selectQuery.addValue(cqlFormattedPrimaryKeyValue);
347 ResultSet results = null;
349 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(selectQuery);
350 // write it back to a quorum
351 Row row = results.one();
352 ColumnDefinitions colInfo = row.getColumnDefinitions();
353 int totalColumns = colInfo.size();
355 StringBuilder fieldValueString = new StringBuilder("");
356 for (Definition definition : colInfo) {
357 String colName = definition.getName();
358 if (colName.equals(primaryKeyName))
360 DataType colType = definition.getType();
361 Object valueObj = MusicDataStoreHandle.getDSHandle().getColValue(row, colName, colType);
362 Object valueString = MusicUtil.convertToActualDataType(colType, valueObj);
363 fieldValueString.append(colName + " = ?");
364 updateQuery.addValue(valueString);
365 if (counter != (totalColumns - 1))
366 fieldValueString.append(",");
367 counter = counter + 1;
369 updateQuery.appendQueryString("UPDATE " + keyspaceName + "." + tableName + " SET "
370 + fieldValueString + " WHERE " + primaryKeyName + "= ? " + ";");
371 updateQuery.addValue(cqlFormattedPrimaryKeyValue);
373 MusicDataStoreHandle.getDSHandle().executePut(updateQuery, "critical");
374 } catch (MusicServiceException | MusicQueryException e) {
375 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.QUERYERROR +""+updateQuery ,ErrorSeverity.MAJOR, ErrorTypes.QUERYERROR);
387 public ResultSet quorumGet(PreparedQueryObject query) {
388 ResultSet results = null;
390 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(query);
391 } catch (MusicServiceException | MusicQueryException e) {
392 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR);
406 public String whoseTurnIsIt(String lockName) {
409 return getLockingServiceHandle().whoseTurnIsIt("/" + lockName) + "";
410 } catch (MusicLockingException e) {
411 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.LOCKINGERROR+lockName ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
423 public static String getLockNameFromId(String lockId) {
424 StringTokenizer st = new StringTokenizer(lockId);
425 return st.nextToken("$");
428 public void destroyLockRef(String lockId) {
429 long start = System.currentTimeMillis();
431 getLockingServiceHandle().unlockAndDeleteId(lockId);
432 } catch (MusicLockingException | NoNodeException e) {
433 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DESTROYLOCK+lockId ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
435 long end = System.currentTimeMillis();
436 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to destroy lock reference:" + (end - start) + " ms");
439 public MusicLockState releaseLock(String lockId, boolean voluntaryRelease) {
440 long start = System.currentTimeMillis();
442 getLockingServiceHandle().unlockAndDeleteId(lockId);
443 } catch (MusicLockingException e1) {
444 logger.error(EELFLoggerDelegate.errorLogger,e1.getMessage(), AppMessages.RELEASELOCK+lockId ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
445 } catch (KeeperException.NoNodeException nne) {
446 logger.error(EELFLoggerDelegate.errorLogger,"Failed to release Lock " + lockId + " " + nne);
447 MusicLockState mls = new MusicLockState("Lock doesn't exists. Release lock operation failed.");
450 String lockName = getLockNameFromId(lockId);
452 String lockHolder = null;
453 if (voluntaryRelease) {
454 mls = new MusicLockState(MusicLockState.LockStatus.UNLOCKED, lockHolder);
455 logger.info(EELFLoggerDelegate.applicationLogger,"In unlock: lock voluntarily released for " + lockId);
457 boolean needToSyncQuorum = true;
458 mls = new MusicLockState(MusicLockState.LockStatus.UNLOCKED, lockHolder,
460 logger.info(EELFLoggerDelegate.applicationLogger,"In unlock: lock forcibly released for " + lockId);
463 getLockingServiceHandle().setLockState(lockName, mls);
464 } catch (MusicLockingException e) {
465 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.RELEASELOCK+lockId ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
467 long end = System.currentTimeMillis();
468 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to release lock:" + (end - start) + " ms");
472 public static void voluntaryReleaseLock(String lockId) throws MusicLockingException{
474 getLockingServiceHandle().unlockAndDeleteId(lockId);
475 } catch (KeeperException.NoNodeException e) {
483 * @throws MusicLockingException
485 public void deleteLock(String lockName) throws MusicLockingException {
486 long start = System.currentTimeMillis();
487 logger.info(EELFLoggerDelegate.applicationLogger,"Deleting lock for " + lockName);
489 getLockingServiceHandle().deleteLock("/" + lockName);
490 } catch (MusicLockingException e) {
491 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.DELTELOCK+lockName ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
492 throw new MusicLockingException(e.getMessage());
494 long end = System.currentTimeMillis();
495 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to delete lock:" + (end - start) + " ms");
503 public static void pureZkCreate(String nodeName) {
505 getLockingServiceHandle().getzkLockHandle().createNode(nodeName);
506 } catch (MusicLockingException e) {
507 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
516 public static void pureZkWrite(String nodeName, byte[] data) {
517 long start = System.currentTimeMillis();
518 logger.info(EELFLoggerDelegate.applicationLogger,"Performing zookeeper write to " + nodeName);
520 getLockingServiceHandle().getzkLockHandle().setNodeData(nodeName, data);
521 } catch (MusicLockingException e) {
522 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
524 logger.info(EELFLoggerDelegate.applicationLogger,"Performed zookeeper write to " + nodeName);
525 long end = System.currentTimeMillis();
526 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the actual zk put:" + (end - start) + " ms");
534 public static byte[] pureZkRead(String nodeName) {
535 long start = System.currentTimeMillis();
538 data = getLockingServiceHandle().getzkLockHandle().getNodeData(nodeName);
539 } catch (MusicLockingException e) {
540 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.CRITICAL, ErrorTypes.LOCKINGERROR);
542 long end = System.currentTimeMillis();
543 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the actual zk put:" + (end - start) + " ms");
549 // Prepared Query Additions.
553 * @param keyspaceName
558 * @throws MusicServiceException
560 public ReturnType eventualPut(PreparedQueryObject queryObject) {
561 boolean result = false;
563 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.EVENTUAL);
564 } catch (MusicServiceException | MusicQueryException ex) {
565 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), "[ERR512E] Failed to get ZK Lock Handle " ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
566 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() + " " + ex.getCause() + " " + ex);
567 return new ReturnType(ResultType.FAILURE, ex.getMessage());
570 return new ReturnType(ResultType.SUCCESS, "Eventual Operation Successfully performed");
572 return new ReturnType(ResultType.FAILURE, "Eventual Operation failed to perform");
578 * @param keyspaceName
585 public ReturnType criticalPut(String keyspaceName, String tableName, String primaryKey,
586 PreparedQueryObject queryObject, String lockId, Condition conditionInfo) {
587 long start = System.currentTimeMillis();
590 MusicLockState mls = getLockingServiceHandle()
591 .getLockState(keyspaceName + "." + tableName + "." + primaryKey);
592 if (mls.getLockHolder().equals(lockId) == true) {
593 if (conditionInfo != null)
595 if (conditionInfo.testCondition() == false)
596 return new ReturnType(ResultType.FAILURE,
597 "Lock acquired but the condition is not true");
598 } catch (Exception e) {
599 return new ReturnType(ResultType.FAILURE,
600 "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
603 boolean result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, MusicUtil.CRITICAL);
604 long end = System.currentTimeMillis();
605 logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
607 return new ReturnType(ResultType.SUCCESS, "Update performed");
609 return new ReturnType(ResultType.FAILURE, "Update failed to perform");
612 return new ReturnType(ResultType.FAILURE,
613 "Cannot perform operation since you are the not the lock holder");
614 } catch (MusicQueryException | MusicServiceException e) {
615 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
616 return new ReturnType(ResultType.FAILURE,
617 "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
619 }catch(MusicLockingException ex){
620 return new ReturnType(ResultType.FAILURE,ex.getMessage());
629 * @return Boolean Indicates success or failure
630 * @throws MusicServiceException
634 public ResultType nonKeyRelatedPut(PreparedQueryObject queryObject, String consistency) throws MusicServiceException {
635 // this is mainly for some functions like keyspace creation etc which does not
636 // really need the bells and whistles of Music locking.
637 boolean result = false;
639 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, consistency);
640 } catch (MusicQueryException | MusicServiceException ex) {
641 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
642 throw new MusicServiceException(ex.getMessage());
644 return result?ResultType.SUCCESS:ResultType.FAILURE;
648 * This method performs DDL operation on cassandra.
650 * @param queryObject query object containing prepared query and values
652 * @throws MusicServiceException
654 public ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
655 ResultSet results = null;
657 results = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(queryObject);
658 } catch (MusicQueryException | MusicServiceException e) {
659 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
660 throw new MusicServiceException(e.getMessage());
665 public static String getMyHostId() {
666 PreparedQueryObject pQuery = new PreparedQueryObject();
667 pQuery.appendQueryString("SELECT HOST_ID FROM SYSTEM.LOCAL");
670 rs = MusicDataStoreHandle.getDSHandle().executeOneConsistencyGet(pQuery);
672 return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString();
673 } catch (Exception e) {
675 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
677 logger.error(EELFLoggerDelegate.errorLogger, "Some issue during MusicCore.getMyHostId");
682 * This method performs DDL operations on cassandra, if the the resource is available. Lock ID
683 * is used to check if the resource is free.
685 * @param keyspaceName name of the keyspace
686 * @param tableName name of the table
687 * @param primaryKey primary key value
688 * @param queryObject query object containing prepared query and values
689 * @param lockId lock ID to check if the resource is free to perform the operation.
692 public ResultSet criticalGet(String keyspaceName, String tableName, String primaryKey,
693 PreparedQueryObject queryObject, String lockId) throws MusicServiceException {
694 ResultSet results = null;
696 MusicLockState mls = getLockingServiceHandle()
697 .getLockState(keyspaceName + "." + tableName + "." + primaryKey);
698 if (mls.getLockHolder().equals(lockId)) {
699 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryObject);
701 throw new MusicServiceException("YOU DO NOT HAVE THE LOCK");
702 } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
703 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
709 * This method performs DML operation on cassandra, when the lock of the dd is acquired.
711 * @param keyspaceName name of the keyspace
712 * @param tableName name of the table
713 * @param primaryKey primary key value
714 * @param queryObject query object containing prepared query and values
716 * @throws MusicLockingException
718 public ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
719 PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
721 long start = System.currentTimeMillis();
722 String key = keyspaceName + "." + tableName + "." + primaryKey;
723 String lockId = createLockReference(key);
724 long lockCreationTime = System.currentTimeMillis();
725 ReturnType lockAcqResult = acquireLock(key, lockId);
726 long lockAcqTime = System.currentTimeMillis();
727 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
728 logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
729 ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
730 queryObject, lockId, conditionInfo);
731 long criticalPutTime = System.currentTimeMillis();
732 voluntaryReleaseLock(lockId);
733 long lockDeleteTime = System.currentTimeMillis();
734 String timingInfo = "|lock creation time:" + (lockCreationTime - start)
735 + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
736 + "|critical put time:" + (criticalPutTime - lockAcqTime)
737 + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
738 criticalPutResult.setTimingInfo(timingInfo);
739 return criticalPutResult;
741 logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
742 destroyLockRef(lockId);
743 return lockAcqResult;
748 * this function is mainly for the benchmarks to see the effect of lock deletion.
750 * @param keyspaceName
754 * @param conditionInfo
756 * @throws MusicLockingException
758 public ReturnType atomicPutWithDeleteLock(String keyspaceName, String tableName,
759 String primaryKey, PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException {
761 long start = System.currentTimeMillis();
762 String key = keyspaceName + "." + tableName + "." + primaryKey;
763 String lockId = createLockReference(key);
764 long lockCreationTime = System.currentTimeMillis();
765 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
766 ReturnType lockAcqResult = acquireLock(key, lockId);
767 long lockAcqTime = System.currentTimeMillis();
768 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
769 logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
770 ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
771 queryObject, lockId, conditionInfo);
772 long criticalPutTime = System.currentTimeMillis();
774 long lockDeleteTime = System.currentTimeMillis();
775 String timingInfo = "|lock creation time:" + (lockCreationTime - start)
776 + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
777 + "|critical put time:" + (criticalPutTime - lockAcqTime)
778 + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
779 criticalPutResult.setTimingInfo(timingInfo);
780 return criticalPutResult;
782 logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
784 return lockAcqResult;
792 * This method performs DDL operation on cassasndra, when the lock for the resource is acquired.
794 * @param keyspaceName name of the keyspace
795 * @param tableName name of the table
796 * @param primaryKey primary key value
797 * @param queryObject query object containing prepared query and values
799 * @throws MusicServiceException
800 * @throws MusicLockingException
802 public ResultSet atomicGet(String keyspaceName, String tableName, String primaryKey,
803 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
804 String key = keyspaceName + "." + tableName + "." + primaryKey;
805 String lockId = createLockReference(key);
806 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
807 ReturnType lockAcqResult = acquireLock(key, lockId);
808 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
809 logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockId);
811 criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockId);
812 voluntaryReleaseLock(lockId);
815 destroyLockRef(lockId);
816 logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockId);
821 public ResultSet atomicGetWithDeleteLock(String keyspaceName, String tableName, String primaryKey,
822 PreparedQueryObject queryObject) throws MusicServiceException, MusicLockingException {
823 String key = keyspaceName + "." + tableName + "." + primaryKey;
824 String lockId = createLockReference(key);
825 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
827 ReturnType lockAcqResult = acquireLock(key, lockId);
829 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
830 logger.info(EELFLoggerDelegate.applicationLogger, "acquired lock with id " + lockId);
831 ResultSet result = criticalGet(keyspaceName, tableName, primaryKey, queryObject, lockId);
836 logger.info(EELFLoggerDelegate.applicationLogger, "unable to acquire lock, id " + lockId);
845 public Map<String, Object> validateLock(String lockName) {
846 Map<String, Object> resultMap = new HashMap<>();
847 String[] locks = lockName.split("\\.");
848 if(locks.length < 3) {
849 resultMap.put("Error", "Invalid lock. Please make sure lock is of the type keyspaceName.tableName.primaryKey");
852 String keyspace= locks[0];
853 if(keyspace.startsWith("$"))
854 keyspace = keyspace.substring(1);
855 resultMap.put("keyspace",keyspace);
862 public ResultType createTable(String keyspace, String table, PreparedQueryObject tableQueryObject,
863 String consistency) throws MusicServiceException {
864 boolean result = false;
867 //create shadow locking table
868 result = createLockQueue(keyspace, table);
870 return ResultType.FAILURE;
874 //create table to track unsynced_keys
875 table = "unsyncedKeys_"+table;
877 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
878 + " ( key text,PRIMARY KEY (key) );";
879 System.out.println(tabQuery);
880 PreparedQueryObject queryObject = new PreparedQueryObject();
882 queryObject.appendQueryString(tabQuery);
884 result = MusicDataStoreHandle.getDSHandle().executePut(queryObject, "eventual");
887 //create actual table
888 result = MusicDataStoreHandle.getDSHandle().executePut(tableQueryObject, consistency);
889 } catch (MusicQueryException | MusicServiceException ex) {
890 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
891 throw new MusicServiceException(ex.getMessage());
893 return result?ResultType.SUCCESS:ResultType.FAILURE;
896 public static boolean createLockQueue(String keyspace, String table) throws MusicServiceException, MusicQueryException {
897 logger.info(EELFLoggerDelegate.applicationLogger,
898 "Create lock queue/table for " + keyspace+"."+table);
899 table = "lockQ_"+table;
900 String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
901 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, PRIMARY KEY ((key), lockReference) ) "
902 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
903 System.out.println(tabQuery);
904 PreparedQueryObject queryObject = new PreparedQueryObject();
906 queryObject.appendQueryString(tabQuery);
908 result = MusicDataStoreHandle.mDstoreHandle.executePut(queryObject, "eventual");
914 public List<String> getLockQueue(String fullyQualifiedKey)
915 throws MusicServiceException, MusicQueryException, MusicLockingException {
916 // TODO Auto-generated method stub
923 public long getLockQueueSize(String fullyQualifiedKey)
924 throws MusicServiceException, MusicQueryException, MusicLockingException {
925 // TODO Auto-generated method stub
929 public ReturnType eventualPut_nb(PreparedQueryObject queryObject, String keyspace, String tablename,
931 return eventualPut(queryObject);