- private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(RestMusicDataAPI.class);
- private static final String CRITICAL = "critical";
-
- public static ReturnType conditionalInsert(String keyspace, String tablename, String casscadeColumnName,
- Map<String, Object> casscadeColumnData, String primaryKey, Map<String, Object> valuesMap,
- Map<String, String> status) throws Exception {
-
- Map<String, PreparedQueryObject> queryBank = new HashMap<>();
- TableMetadata tableInfo = null;
- tableInfo = MusicCore.returnColumnMetadata(keyspace, tablename);
- DataType primaryIdType = tableInfo.getPrimaryKey().get(0).getType();
- String primaryId = tableInfo.getPrimaryKey().get(0).getName();
- DataType casscadeColumnType = tableInfo.getColumn(casscadeColumnName).getType();
- String vector = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
-
- PreparedQueryObject select = new PreparedQueryObject();
- select.appendQueryString("SELECT * FROM " + keyspace + "." + tablename + " where " + primaryId + " = ?");
- select.addValue(MusicUtil.convertToActualDataType(primaryIdType, primaryKey));
- queryBank.put(MusicUtil.SELECT, select);
-
- PreparedQueryObject update = new PreparedQueryObject();
- Map<String, String> updateColumnvalues = new HashMap<>(); //casscade column values
- updateColumnvalues = getValues(true, casscadeColumnData, status);
- Object formatedValues = MusicUtil.convertToActualDataType(casscadeColumnType, updateColumnvalues);
- update.appendQueryString("UPDATE " + keyspace + "." + tablename + " SET " + casscadeColumnName + " ="
- + casscadeColumnName + " + ? , vector_ts = ?" + " WHERE " + primaryId + " = ? ");
- update.addValue(formatedValues);
- update.addValue(MusicUtil.convertToActualDataType(DataType.text(), vector));
- update.addValue(MusicUtil.convertToActualDataType(primaryIdType, primaryKey));
- queryBank.put(MusicUtil.UPDATE, update);
-
-
- Map<String, String> insertColumnvalues = new HashMap<>();//casscade column values
- insertColumnvalues = getValues(false, casscadeColumnData, status);
- formatedValues = MusicUtil.convertToActualDataType(casscadeColumnType, insertColumnvalues);
- PreparedQueryObject insert = extractQuery(valuesMap, tableInfo, tablename, keyspace, primaryId, primaryKey,casscadeColumnName,formatedValues);
- queryBank.put(MusicUtil.INSERT, insert);
-
-
- String key = keyspace + "." + tablename + "." + primaryKey;
- String lockId = MusicCore.createLockReference(key);
- long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
- ReturnType lockAcqResult = MusicCore.acquireLockWithLease(key, lockId, leasePeriod);
-
- try {
- if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
- ReturnType criticalPutResult = conditionalInsertAtomic(lockId, keyspace, tablename, primaryKey,
- queryBank);
- MusicCore.destroyLockRef(lockId);
- if (criticalPutResult.getMessage().contains("insert"))
- criticalPutResult
- .setMessage("Insert values: ");
- else if (criticalPutResult.getMessage().contains("update"))
- criticalPutResult
- .setMessage("Update values: " + updateColumnvalues);
- return criticalPutResult;
-
- } else {
- MusicCore.destroyLockRef(lockId);
- return lockAcqResult;
- }
- } catch (Exception e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED);
- MusicCore.destroyLockRef(lockId);
- return new ReturnType(ResultType.FAILURE, e.getMessage());
- }
-
- }
-
- public static ReturnType conditionalInsertAtomic(String lockId, String keyspace, String tableName,
- String primaryKey, Map<String, PreparedQueryObject> queryBank) {
-
- ResultSet results = null;
-
- try {
-
- MusicLockState mls = MusicCore.getLockingServiceHandle()
- .getLockState(keyspace + "." + tableName + "." + primaryKey);
- if (mls.getLockHolder().equals(lockId)) {
- try {
- results = MusicCore.getDSHandle().executeCriticalGet(queryBank.get(MusicUtil.SELECT));
- } catch (Exception e) {
- return new ReturnType(ResultType.FAILURE, e.getMessage());
- }
- if (results.all().isEmpty()) {
- MusicCore.getDSHandle().executePut(queryBank.get(MusicUtil.INSERT), CRITICAL);
- return new ReturnType(ResultType.SUCCESS, "insert");
- } else {
- MusicCore.getDSHandle().executePut(queryBank.get(MusicUtil.UPDATE), CRITICAL);
- return new ReturnType(ResultType.SUCCESS, "update");
- }
- } else {
- return new ReturnType(ResultType.FAILURE,
- "Cannot perform operation since you are the not the lock holder");
- }
-
- } catch (Exception e) {
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
- String exceptionAsString = sw.toString();
- return new ReturnType(ResultType.FAILURE,
- "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
- + exceptionAsString);
- }
-
- }
-
- public static ReturnType update(Map<String,PreparedQueryObject> queryBank, String keyspace, String tableName, String primaryKey,String primaryKeyValue,String planId,String cascadeColumnName,Map<String,String> cascadeColumnValues) {
-
- String key = keyspace + "." + tableName + "." + primaryKeyValue;
- String lockId = MusicCore.createLockReference(key);
- long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
- ReturnType lockAcqResult = MusicCore.acquireLockWithLease(key, lockId, leasePeriod);
-
- try {
-
- if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
- return updateAtomic(lockId, keyspace, tableName, primaryKey,primaryKeyValue, queryBank,planId,cascadeColumnValues,cascadeColumnName);
-
- } else {
- MusicCore.destroyLockRef(lockId);
- return lockAcqResult;
- }
-
- } catch (Exception e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
- MusicCore.destroyLockRef(lockId);
- return new ReturnType(ResultType.FAILURE, e.getMessage());
-
- }
- }
-
- public static ReturnType updateAtomic(String lockId, String keyspace, String tableName, String primaryKey,String primaryKeyValue,
- Map<String,PreparedQueryObject> queryBank,String planId,Map<String,String> cascadeColumnValues,String casscadeColumnName) {
- try {
-
- MusicLockState mls = MusicCore.getLockingServiceHandle()
- .getLockState(keyspace + "." + tableName + "." + primaryKeyValue);
- if (mls.getLockHolder().equals(lockId)) {
- Row row = MusicCore.getDSHandle().executeCriticalGet(queryBank.get(MusicUtil.SELECT)).one();
-
- if(row != null) {
- Map<String, String> updatedValues = cascadeColumnUpdateSpecific(row, cascadeColumnValues, casscadeColumnName, planId);
- JSONObject json = new JSONObject(updatedValues);
- PreparedQueryObject update = new PreparedQueryObject();
- String vector_ts = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
- update.appendQueryString("UPDATE " + keyspace + "." + tableName + " SET " + casscadeColumnName + "['" + planId
- + "'] = ?, vector_ts = ? WHERE " + primaryKey + " = ?");
- update.addValue(MusicUtil.convertToActualDataType(DataType.text(), json.toString()));
- update.addValue(MusicUtil.convertToActualDataType(DataType.text(), vector_ts));
- update.addValue(MusicUtil.convertToActualDataType(DataType.text(), primaryKeyValue));
- try {
- MusicCore.getDSHandle().executePut(update, CRITICAL);
- } catch (Exception ex) {
- return new ReturnType(ResultType.FAILURE, ex.getMessage());
- }
- }else {
- return new ReturnType(ResultType.FAILURE,"Cannot find data related to key: "+primaryKey);
- }
- MusicCore.getDSHandle().executePut(queryBank.get(MusicUtil.UPSERT), CRITICAL);
- return new ReturnType(ResultType.SUCCESS, "update success");
-
- } else {
- return new ReturnType(ResultType.FAILURE,
- "Cannot perform operation since you are the not the lock holder");
- }
-
- } catch (Exception e) {
- logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.EXECUTIONINTERRUPTED, ErrorSeverity.ERROR, ErrorTypes.LOCKINGERROR);
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- String exceptionAsString = sw.toString();
- return new ReturnType(ResultType.FAILURE,
- "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
- + exceptionAsString);
- }
-
- }
-
- @SuppressWarnings("unchecked")
- public static Map<String, String> getValues(boolean isExists, Map<String, Object> casscadeColumnData,
- Map<String, String> status) {
-
- Map<String, String> value = new HashMap<>();
- Map<String, String> returnMap = new HashMap<>();
- Object key = casscadeColumnData.get("key");
- String setStatus = "";
- value = (Map<String, String>) casscadeColumnData.get("value");
-
- if (isExists)
- setStatus = status.get("exists");
- else
- setStatus = status.get("nonexists");
-
- value.put("status", setStatus);
- JSONObject valueJson = new JSONObject(value);
- returnMap.put(key.toString(), valueJson.toString());
- return returnMap;
-
- }
-
- public static PreparedQueryObject extractQuery(Map<String, Object> valuesMap, TableMetadata tableInfo, String tableName,
- String keySpaceName,String primaryKeyName,String primaryKey,String casscadeColumn,Object casscadeColumnValues) throws Exception {
-
- PreparedQueryObject queryObject = new PreparedQueryObject();
- StringBuilder fieldsString = new StringBuilder("(vector_ts"+",");
- StringBuilder valueString = new StringBuilder("(" + "?" + ",");
- String vector = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
- queryObject.addValue(vector);
- if(casscadeColumn!=null && casscadeColumnValues!=null) {
- fieldsString.append("" +casscadeColumn+" ," );
- valueString.append("?,");
- queryObject.addValue(casscadeColumnValues);
- }
-
- int counter = 0;
- for (Map.Entry<String, Object> entry : valuesMap.entrySet()) {
+ private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(RestMusicDataAPI.class);
+
+ public static ReturnType conditionalInsert(String keyspace, String tablename, String casscadeColumnName,
+ Map<String, Object> casscadeColumnData, String primaryKey, Map<String, Object> valuesMap,
+ Map<String, String> status) throws Exception {
+
+ Map<String, PreparedQueryObject> queryBank = new HashMap<>();
+ TableMetadata tableInfo = null;
+ tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, tablename);
+ DataType primaryIdType = tableInfo.getPrimaryKey().get(0).getType();
+ String primaryId = tableInfo.getPrimaryKey().get(0).getName();
+ DataType casscadeColumnType = tableInfo.getColumn(casscadeColumnName).getType();
+ String vector = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
+
+ PreparedQueryObject select = new PreparedQueryObject();
+ select.appendQueryString("SELECT * FROM " + keyspace + "." + tablename + " where " + primaryId + " = ?");
+ select.addValue(MusicUtil.convertToActualDataType(primaryIdType, primaryKey));
+ queryBank.put(MusicUtil.SELECT, select);
+
+ PreparedQueryObject update = new PreparedQueryObject();
+ //casscade column values
+ Map<String, String> updateColumnvalues = getValues(true, casscadeColumnData, status);
+ Object formatedValues = MusicUtil.convertToActualDataType(casscadeColumnType, updateColumnvalues);
+ update.appendQueryString("UPDATE " + keyspace + "." + tablename + " SET " + casscadeColumnName + " ="
+ + casscadeColumnName + " + ? , vector_ts = ?" + " WHERE " + primaryId + " = ? ");
+ update.addValue(formatedValues);
+ update.addValue(MusicUtil.convertToActualDataType(DataType.text(), vector));
+ update.addValue(MusicUtil.convertToActualDataType(primaryIdType, primaryKey));
+ queryBank.put(MusicUtil.UPDATE, update);
+
+
+ //casscade column values
+ Map<String, String> insertColumnvalues = getValues(false, casscadeColumnData, status);
+ formatedValues = MusicUtil.convertToActualDataType(casscadeColumnType, insertColumnvalues);
+ PreparedQueryObject insert = extractQuery(valuesMap, tableInfo, tablename, keyspace, primaryId, primaryKey,casscadeColumnName,formatedValues);
+ queryBank.put(MusicUtil.INSERT, insert);
+
+
+ String key = keyspace + "." + tablename + "." + primaryKey;
+ String lockId = MusicCore.createLockReference(key);
+ long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
+ ReturnType lockAcqResult = MusicCore.acquireLockWithLease(key, lockId, leasePeriod);
+
+ try {
+ if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+ ReturnType criticalPutResult = conditionalInsertAtomic(lockId, keyspace, tablename, primaryKey,
+ queryBank);
+ MusicCore.destroyLockRef(lockId);
+ if (criticalPutResult.getMessage().contains("insert"))
+ criticalPutResult
+ .setMessage("Insert values: ");
+ else if (criticalPutResult.getMessage().contains("update"))
+ criticalPutResult
+ .setMessage("Update values: " + updateColumnvalues);
+ return criticalPutResult;
+
+ } else {
+ MusicCore.destroyLockRef(lockId);
+ return lockAcqResult;
+ }
+ } catch (Exception e) {
+ MusicCore.destroyLockRef(lockId);
+ return new ReturnType(ResultType.FAILURE, e.getMessage());
+ }
+
+ }
+
+ public static ReturnType conditionalInsertAtomic(String lockId, String keyspace, String tableName,
+ String primaryKey, Map<String, PreparedQueryObject> queryBank) {
+
+ ResultSet results = null;
+
+ try {
+ String fullyQualifiedKey = keyspace + "." + tableName + "." + primaryKey;
+ ReturnType lockAcqResult = MusicCore.acquireLock(fullyQualifiedKey, lockId);
+ if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+ try {
+ results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryBank.get(MusicUtil.SELECT));
+ } catch (Exception e) {
+ return new ReturnType(ResultType.FAILURE, e.getMessage());
+ }
+ if (results.all().isEmpty()) {
+ MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.INSERT), "critical");
+ return new ReturnType(ResultType.SUCCESS, "insert");
+ } else {
+ MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.UPDATE), "critical");
+ return new ReturnType(ResultType.SUCCESS, "update");
+ }
+ } else {
+ return new ReturnType(ResultType.FAILURE,
+ "Cannot perform operation since you are the not the lock holder");
+ }
+
+ } catch (Exception e) {
+ StringWriter sw = new StringWriter();
+ e.printStackTrace(new PrintWriter(sw));
+ String exceptionAsString = sw.toString();
+ return new ReturnType(ResultType.FAILURE,
+ "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
+ + exceptionAsString);
+ }
+
+ }
+
+ public static ReturnType update(Map<String,PreparedQueryObject> queryBank, String keyspace, String tableName, String primaryKey,String primaryKeyValue,String planId,String cascadeColumnName,Map<String,String> cascadeColumnValues) throws MusicLockingException, MusicQueryException, MusicServiceException {
+
+ String key = keyspace + "." + tableName + "." + primaryKeyValue;
+ String lockId = MusicCore.createLockReference(key);
+ long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
+ ReturnType lockAcqResult = MusicCore.acquireLockWithLease(key, lockId, leasePeriod);
+
+ try {
+
+ if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+ ReturnType criticalPutResult = updateAtomic(lockId, keyspace, tableName, primaryKey,primaryKeyValue, queryBank,planId,cascadeColumnValues,cascadeColumnName);
+ MusicCore.destroyLockRef(lockId);
+ return criticalPutResult;
+ } else {
+ MusicCore.destroyLockRef(lockId);
+ return lockAcqResult;
+ }
+
+ } catch (Exception e) {
+ MusicCore.destroyLockRef(lockId);
+ return new ReturnType(ResultType.FAILURE, e.getMessage());
+
+ }
+ }
+
+ public static ReturnType updateAtomic(String lockId, String keyspace, String tableName, String primaryKey,String primaryKeyValue,
+ Map<String,PreparedQueryObject> queryBank,String planId,Map<String,String> cascadeColumnValues,String casscadeColumnName) {
+ try {
+ String fullyQualifiedKey = keyspace + "." + tableName + "." + primaryKeyValue;
+ ReturnType lockAcqResult = MusicCore.acquireLock(fullyQualifiedKey, lockId);
+
+ if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
+ Row row = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryBank.get(MusicUtil.SELECT)).one();
+
+ if(row != null) {
+ Map<String, String> updatedValues = cascadeColumnUpdateSpecific(row, cascadeColumnValues, casscadeColumnName, planId);
+ JSONObject json = new JSONObject(updatedValues);
+ PreparedQueryObject update = new PreparedQueryObject();
+ String vector_ts = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
+ update.appendQueryString("UPDATE " + keyspace + "." + tableName + " SET " + casscadeColumnName + "['" + planId
+ + "'] = ?, vector_ts = ? WHERE " + primaryKey + " = ?");
+ update.addValue(MusicUtil.convertToActualDataType(DataType.text(), json.toString()));
+ update.addValue(MusicUtil.convertToActualDataType(DataType.text(), vector_ts));
+ update.addValue(MusicUtil.convertToActualDataType(DataType.text(), primaryKeyValue));
+ try {
+ MusicDataStoreHandle.getDSHandle().executePut(update, "critical");
+ } catch (Exception ex) {
+ return new ReturnType(ResultType.FAILURE, ex.getMessage());
+ }
+ }else {
+ return new ReturnType(ResultType.FAILURE,"Cannot find data related to key: "+primaryKey);
+ }
+ MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.UPSERT), "critical");
+ return new ReturnType(ResultType.SUCCESS, "update success");
+
+ } else {
+ return new ReturnType(ResultType.FAILURE,
+ "Cannot perform operation since you are the not the lock holder");
+ }
+
+ } catch (Exception e) {
+ StringWriter sw = new StringWriter();
+ e.printStackTrace(new PrintWriter(sw));
+ String exceptionAsString = sw.toString();
+ return new ReturnType(ResultType.FAILURE,
+ "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
+ + exceptionAsString);
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Map<String, String> getValues(boolean isExists, Map<String, Object> casscadeColumnData,
+ Map<String, String> status) {
+
+ Map<String, String> returnMap = new HashMap<>();
+ Object key = casscadeColumnData.get("key");
+ String setStatus = "";
+ Map<String, String> value = (Map<String, String>) casscadeColumnData.get("value");
+
+ if (isExists)
+ setStatus = status.get("exists");
+ else
+ setStatus = status.get("nonexists");
+
+ value.put("status", setStatus);
+ JSONObject valueJson = new JSONObject(value);
+ returnMap.put(key.toString(), valueJson.toString());
+ return returnMap;
+
+ }
+
+ public static PreparedQueryObject extractQuery(Map<String, Object> valuesMap, TableMetadata tableInfo, String tableName,
+ String keySpaceName,String primaryKeyName,String primaryKey,String casscadeColumn,Object casscadeColumnValues) throws Exception {
+
+ PreparedQueryObject queryObject = new PreparedQueryObject();
+ StringBuilder fieldsString = new StringBuilder("(vector_ts"+",");
+ StringBuilder valueString = new StringBuilder("(" + "?" + ",");
+ String vector = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
+ String localPrimaryKey;
+ queryObject.addValue(vector);
+ if(casscadeColumn!=null && casscadeColumnValues!=null) {
+ fieldsString.append(casscadeColumn).append(" ,");
+ valueString.append("?,");
+ queryObject.addValue(casscadeColumnValues);
+ }
+
+ int counter = 0;
+ for (Map.Entry<String, Object> entry : valuesMap.entrySet()) {