2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * Modifications Copyright (C) 2019 IBM.
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
20 * ============LICENSE_END=============================================
21 * ====================================================================
24 package org.onap.music.conductor.conditionals;
26 import java.io.PrintWriter;
27 import java.io.StringWriter;
28 import java.util.HashMap;
31 import org.codehaus.jettison.json.JSONObject;
32 import org.onap.music.datastore.MusicDataStoreHandle;
33 import org.onap.music.datastore.PreparedQueryObject;
34 import org.onap.music.eelf.logging.EELFLoggerDelegate;
35 import org.onap.music.eelf.logging.format.AppMessages;
36 import org.onap.music.eelf.logging.format.ErrorSeverity;
37 import org.onap.music.eelf.logging.format.ErrorTypes;
38 import org.onap.music.exceptions.MusicLockingException;
39 import org.onap.music.exceptions.MusicQueryException;
40 import org.onap.music.exceptions.MusicServiceException;
41 import org.onap.music.main.MusicCore;
42 import org.onap.music.main.MusicUtil;
43 import org.onap.music.main.ResultType;
44 import org.onap.music.main.ReturnType;
45 import org.onap.music.rest.RestMusicDataAPI;
47 import com.datastax.driver.core.ColumnDefinitions;
48 import com.datastax.driver.core.DataType;
49 import com.datastax.driver.core.ResultSet;
50 import com.datastax.driver.core.Row;
51 import com.datastax.driver.core.TableMetadata;
53 public class MusicConditional {
54 private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(RestMusicDataAPI.class);
56 public static ReturnType conditionalInsert(String keyspace, String tablename, String casscadeColumnName,
57 Map<String, Object> casscadeColumnData, String primaryKey, Map<String, Object> valuesMap,
58 Map<String, String> status) throws Exception {
60 Map<String, PreparedQueryObject> queryBank = new HashMap<>();
61 TableMetadata tableInfo = null;
62 tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, tablename);
63 DataType primaryIdType = tableInfo.getPrimaryKey().get(0).getType();
64 String primaryId = tableInfo.getPrimaryKey().get(0).getName();
65 DataType casscadeColumnType = tableInfo.getColumn(casscadeColumnName).getType();
66 String vector = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
68 PreparedQueryObject select = new PreparedQueryObject();
69 select.appendQueryString("SELECT * FROM " + keyspace + "." + tablename + " where " + primaryId + " = ?");
70 select.addValue(MusicUtil.convertToActualDataType(primaryIdType, primaryKey));
71 queryBank.put(MusicUtil.SELECT, select);
73 PreparedQueryObject update = new PreparedQueryObject();
74 //casscade column values
75 Map<String, String> updateColumnvalues = getValues(true, casscadeColumnData, status);
76 Object formatedValues = MusicUtil.convertToActualDataType(casscadeColumnType, updateColumnvalues);
77 update.appendQueryString("UPDATE " + keyspace + "." + tablename + " SET " + casscadeColumnName + " ="
78 + casscadeColumnName + " + ? , vector_ts = ?" + " WHERE " + primaryId + " = ? ");
79 update.addValue(formatedValues);
80 update.addValue(MusicUtil.convertToActualDataType(DataType.text(), vector));
81 update.addValue(MusicUtil.convertToActualDataType(primaryIdType, primaryKey));
82 queryBank.put(MusicUtil.UPDATE, update);
85 //casscade column values
86 Map<String, String> insertColumnvalues = getValues(false, casscadeColumnData, status);
87 formatedValues = MusicUtil.convertToActualDataType(casscadeColumnType, insertColumnvalues);
88 PreparedQueryObject insert = extractQuery(valuesMap, tableInfo, tablename, keyspace, primaryId, primaryKey,casscadeColumnName,formatedValues);
89 queryBank.put(MusicUtil.INSERT, insert);
92 String key = keyspace + "." + tablename + "." + primaryKey;
93 String lockId = MusicCore.createLockReference(key);
94 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
95 ReturnType lockAcqResult = MusicCore.acquireLockWithLease(key, lockId, leasePeriod);
98 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
99 ReturnType criticalPutResult = conditionalInsertAtomic(lockId, keyspace, tablename, primaryKey,
101 MusicCore.destroyLockRef(lockId);
102 if (criticalPutResult.getMessage().contains("insert"))
104 .setMessage("Insert values: ");
105 else if (criticalPutResult.getMessage().contains("update"))
107 .setMessage("Update values: " + updateColumnvalues);
108 return criticalPutResult;
111 MusicCore.destroyLockRef(lockId);
112 return lockAcqResult;
114 } catch (Exception e) {
115 MusicCore.destroyLockRef(lockId);
116 return new ReturnType(ResultType.FAILURE, e.getMessage());
121 public static ReturnType conditionalInsertAtomic(String lockId, String keyspace, String tableName,
122 String primaryKey, Map<String, PreparedQueryObject> queryBank) {
124 ResultSet results = null;
127 String fullyQualifiedKey = keyspace + "." + tableName + "." + primaryKey;
128 ReturnType lockAcqResult = MusicCore.acquireLock(fullyQualifiedKey, lockId);
129 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
131 results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryBank.get(MusicUtil.SELECT));
132 } catch (Exception e) {
133 return new ReturnType(ResultType.FAILURE, e.getMessage());
135 if (results.all().isEmpty()) {
136 MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.INSERT), "critical");
137 return new ReturnType(ResultType.SUCCESS, "insert");
139 MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.UPDATE), "critical");
140 return new ReturnType(ResultType.SUCCESS, "update");
143 return new ReturnType(ResultType.FAILURE,
144 "Cannot perform operation since you are the not the lock holder");
147 } catch (Exception e) {
148 StringWriter sw = new StringWriter();
149 e.printStackTrace(new PrintWriter(sw));
150 String exceptionAsString = sw.toString();
151 return new ReturnType(ResultType.FAILURE,
152 "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
153 + exceptionAsString);
158 public static ReturnType update(Map<String,PreparedQueryObject> queryBank, String keyspace, String tableName, String primaryKey,String primaryKeyValue,String planId,String cascadeColumnName,Map<String,String> cascadeColumnValues) throws MusicLockingException, MusicQueryException, MusicServiceException {
160 String key = keyspace + "." + tableName + "." + primaryKeyValue;
161 String lockId = MusicCore.createLockReference(key);
162 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
163 ReturnType lockAcqResult = MusicCore.acquireLockWithLease(key, lockId, leasePeriod);
167 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
168 ReturnType criticalPutResult = updateAtomic(lockId, keyspace, tableName, primaryKey,primaryKeyValue, queryBank,planId,cascadeColumnValues,cascadeColumnName);
169 MusicCore.destroyLockRef(lockId);
170 return criticalPutResult;
172 MusicCore.destroyLockRef(lockId);
173 return lockAcqResult;
176 } catch (Exception e) {
177 MusicCore.destroyLockRef(lockId);
178 return new ReturnType(ResultType.FAILURE, e.getMessage());
183 public static ReturnType updateAtomic(String lockId, String keyspace, String tableName, String primaryKey,String primaryKeyValue,
184 Map<String,PreparedQueryObject> queryBank,String planId,Map<String,String> cascadeColumnValues,String casscadeColumnName) {
186 String fullyQualifiedKey = keyspace + "." + tableName + "." + primaryKeyValue;
187 ReturnType lockAcqResult = MusicCore.acquireLock(fullyQualifiedKey, lockId);
189 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
190 Row row = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryBank.get(MusicUtil.SELECT)).one();
193 Map<String, String> updatedValues = cascadeColumnUpdateSpecific(row, cascadeColumnValues, casscadeColumnName, planId);
194 JSONObject json = new JSONObject(updatedValues);
195 PreparedQueryObject update = new PreparedQueryObject();
196 String vector_ts = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
197 update.appendQueryString("UPDATE " + keyspace + "." + tableName + " SET " + casscadeColumnName + "['" + planId
198 + "'] = ?, vector_ts = ? WHERE " + primaryKey + " = ?");
199 update.addValue(MusicUtil.convertToActualDataType(DataType.text(), json.toString()));
200 update.addValue(MusicUtil.convertToActualDataType(DataType.text(), vector_ts));
201 update.addValue(MusicUtil.convertToActualDataType(DataType.text(), primaryKeyValue));
203 MusicDataStoreHandle.getDSHandle().executePut(update, "critical");
204 } catch (Exception ex) {
205 return new ReturnType(ResultType.FAILURE, ex.getMessage());
208 return new ReturnType(ResultType.FAILURE,"Cannot find data related to key: "+primaryKey);
210 MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.UPSERT), "critical");
211 return new ReturnType(ResultType.SUCCESS, "update success");
214 return new ReturnType(ResultType.FAILURE,
215 "Cannot perform operation since you are the not the lock holder");
218 } catch (Exception e) {
219 StringWriter sw = new StringWriter();
220 e.printStackTrace(new PrintWriter(sw));
221 String exceptionAsString = sw.toString();
222 return new ReturnType(ResultType.FAILURE,
223 "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
224 + exceptionAsString);
229 @SuppressWarnings("unchecked")
230 public static Map<String, String> getValues(boolean isExists, Map<String, Object> casscadeColumnData,
231 Map<String, String> status) {
233 Map<String, String> returnMap = new HashMap<>();
234 Object key = casscadeColumnData.get("key");
235 String setStatus = "";
236 Map<String, String> value = (Map<String, String>) casscadeColumnData.get("value");
239 setStatus = status.get("exists");
241 setStatus = status.get("nonexists");
243 value.put("status", setStatus);
244 JSONObject valueJson = new JSONObject(value);
245 returnMap.put(key.toString(), valueJson.toString());
250 public static PreparedQueryObject extractQuery(Map<String, Object> valuesMap, TableMetadata tableInfo, String tableName,
251 String keySpaceName,String primaryKeyName,String primaryKey,String casscadeColumn,Object casscadeColumnValues) throws Exception {
253 PreparedQueryObject queryObject = new PreparedQueryObject();
254 StringBuilder fieldsString = new StringBuilder("(vector_ts"+",");
255 StringBuilder valueString = new StringBuilder("(" + "?" + ",");
256 String vector = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
257 String localPrimaryKey;
258 queryObject.addValue(vector);
259 if(casscadeColumn!=null && casscadeColumnValues!=null) {
260 fieldsString.append(casscadeColumn).append(" ,");
261 valueString.append("?,");
262 queryObject.addValue(casscadeColumnValues);
266 for (Map.Entry<String, Object> entry : valuesMap.entrySet()) {
268 fieldsString.append(entry.getKey());
269 Object valueObj = entry.getValue();
270 if (primaryKeyName.equals(entry.getKey())) {
271 localPrimaryKey = entry.getValue() + "";
272 localPrimaryKey = localPrimaryKey.replace("'", "''");
274 DataType colType = null;
276 colType = tableInfo.getColumn(entry.getKey()).getType();
277 } catch(NullPointerException ex) {
278 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() +" Invalid column name : "+entry.getKey(), AppMessages.INCORRECTDATA ,ErrorSeverity.CRITICAL, ErrorTypes.DATAERROR);
282 Object formattedValue = null;
284 formattedValue = MusicUtil.convertToActualDataType(colType, valueObj);
285 } catch (Exception e) {
286 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
289 valueString.append("?");
290 queryObject.addValue(formattedValue);
293 if (counter == valuesMap.size() - 1) {
294 fieldsString.append(")");
295 valueString.append(")");
297 fieldsString.append(",");
298 valueString.append(",");
300 counter = counter + 1;
302 queryObject.appendQueryString("INSERT INTO " + keySpaceName + "." + tableName + " "
303 + fieldsString + " VALUES " + valueString);
307 public static Object getColValue(Row row, String colName, DataType colType) {
308 switch (colType.getName()) {
310 return row.getString(colName);
312 return row.getUUID(colName);
314 return row.getVarint(colName);
316 return row.getLong(colName);
318 return row.getInt(colName);
320 return row.getFloat(colName);
322 return row.getDouble(colName);
324 return row.getBool(colName);
326 return row.getMap(colName, String.class, String.class);
332 @SuppressWarnings("unchecked")
333 public static Map<String, String> cascadeColumnUpdateSpecific(Row row, Map<String, String> changeOfStatus,
334 String cascadeColumnName, String planId) {
336 ColumnDefinitions colInfo = row.getColumnDefinitions();
337 DataType colType = colInfo.getType(cascadeColumnName);
338 Object columnValue = getColValue(row, cascadeColumnName, colType);
340 Map<String, String> finalValues = new HashMap<>();
341 Map<String, String> values = (Map<String, String>) columnValue;
342 if (values != null && values.keySet().contains(planId)) {
343 String valueString = values.get(planId);
344 String tempValueString = valueString.replaceAll("\\{", "").replaceAll("\"", "").replaceAll("\\}", "");
345 String[] elements = tempValueString.split(",");
346 for (String str : elements) {
347 String[] keyValue = str.split(":");
348 if ((changeOfStatus.keySet().contains(keyValue[0].replaceAll("\\s", ""))))
349 keyValue[1] = changeOfStatus.get(keyValue[0].replaceAll("\\s", ""));
350 finalValues.put(keyValue[0], keyValue[1]);