2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright (c) 2017 AT&T Intellectual Property
6 * ===================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * ============LICENSE_END=============================================
20 * ====================================================================
22 package org.onap.music.conductor.conditionals;
24 import java.io.PrintWriter;
25 import java.io.StringWriter;
26 import java.util.HashMap;
29 import org.codehaus.jettison.json.JSONObject;
30 import org.onap.music.datastore.PreparedQueryObject;
31 import org.onap.music.eelf.logging.EELFLoggerDelegate;
32 import org.onap.music.eelf.logging.format.AppMessages;
33 import org.onap.music.eelf.logging.format.ErrorSeverity;
34 import org.onap.music.eelf.logging.format.ErrorTypes;
35 import org.onap.music.lockingservice.MusicLockState;
36 import org.onap.music.main.MusicCore;
37 import org.onap.music.main.MusicUtil;
38 import org.onap.music.main.ResultType;
39 import org.onap.music.main.ReturnType;
40 import org.onap.music.rest.RestMusicDataAPI;
42 import com.datastax.driver.core.ColumnDefinitions;
43 import com.datastax.driver.core.DataType;
44 import com.datastax.driver.core.ResultSet;
45 import com.datastax.driver.core.Row;
46 import com.datastax.driver.core.TableMetadata;
48 public class MusicConditional {
49 private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(RestMusicDataAPI.class);
51 public static ReturnType conditionalInsert(String keyspace, String tablename, String casscadeColumnName,
52 Map<String, Object> casscadeColumnData, String primaryKey, Map<String, Object> valuesMap,
53 Map<String, String> status) throws Exception {
55 Map<String, PreparedQueryObject> queryBank = new HashMap<>();
56 TableMetadata tableInfo = null;
57 tableInfo = MusicCore.returnColumnMetadata(keyspace, tablename);
58 DataType primaryIdType = tableInfo.getPrimaryKey().get(0).getType();
59 String primaryId = tableInfo.getPrimaryKey().get(0).getName();
60 DataType casscadeColumnType = tableInfo.getColumn(casscadeColumnName).getType();
61 String vector = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
63 PreparedQueryObject select = new PreparedQueryObject();
64 select.appendQueryString("SELECT * FROM " + keyspace + "." + tablename + " where " + primaryId + " = ?");
65 select.addValue(MusicUtil.convertToActualDataType(primaryIdType, primaryKey));
66 queryBank.put(MusicUtil.SELECT, select);
68 PreparedQueryObject update = new PreparedQueryObject();
69 Map<String, String> updateColumnvalues = new HashMap<>(); //casscade column values
70 updateColumnvalues = getValues(true, casscadeColumnData, status);
71 Object formatedValues = MusicUtil.convertToActualDataType(casscadeColumnType, updateColumnvalues);
72 update.appendQueryString("UPDATE " + keyspace + "." + tablename + " SET " + casscadeColumnName + " ="
73 + casscadeColumnName + " + ? , vector_ts = ?" + " WHERE " + primaryId + " = ? ");
74 update.addValue(formatedValues);
75 update.addValue(MusicUtil.convertToActualDataType(DataType.text(), vector));
76 update.addValue(MusicUtil.convertToActualDataType(primaryIdType, primaryKey));
77 queryBank.put(MusicUtil.UPDATE, update);
80 Map<String, String> insertColumnvalues = new HashMap<>();//casscade column values
81 insertColumnvalues = getValues(false, casscadeColumnData, status);
82 formatedValues = MusicUtil.convertToActualDataType(casscadeColumnType, insertColumnvalues);
83 PreparedQueryObject insert = extractQuery(valuesMap, tableInfo, tablename, keyspace, primaryId, primaryKey,casscadeColumnName,formatedValues);
84 queryBank.put(MusicUtil.INSERT, insert);
87 String key = keyspace + "." + tablename + "." + primaryKey;
88 String lockId = MusicCore.createLockReference(key);
89 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
90 ReturnType lockAcqResult = MusicCore.acquireLockWithLease(key, lockId, leasePeriod);
93 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
94 ReturnType criticalPutResult = conditionalInsertAtomic(lockId, keyspace, tablename, primaryKey,
96 MusicCore.destroyLockRef(lockId);
97 if (criticalPutResult.getMessage().contains("insert"))
99 .setMessage("Insert values: ");
100 else if (criticalPutResult.getMessage().contains("update"))
102 .setMessage("Update values: " + updateColumnvalues);
103 return criticalPutResult;
106 MusicCore.destroyLockRef(lockId);
107 return lockAcqResult;
109 } catch (Exception e) {
110 MusicCore.destroyLockRef(lockId);
111 return new ReturnType(ResultType.FAILURE, e.getMessage());
116 public static ReturnType conditionalInsertAtomic(String lockId, String keyspace, String tableName,
117 String primaryKey, Map<String, PreparedQueryObject> queryBank) {
119 ResultSet results = null;
123 MusicLockState mls = MusicCore.getLockingServiceHandle()
124 .getLockState(keyspace + "." + tableName + "." + primaryKey);
125 if (mls.getLockHolder().equals(lockId) == true) {
127 results = MusicCore.getDSHandle().executeCriticalGet(queryBank.get(MusicUtil.SELECT));
128 } catch (Exception e) {
129 return new ReturnType(ResultType.FAILURE, e.getMessage());
131 if (results.all().isEmpty()) {
132 MusicCore.getDSHandle().executePut(queryBank.get(MusicUtil.INSERT), "critical");
133 return new ReturnType(ResultType.SUCCESS, "insert");
135 MusicCore.getDSHandle().executePut(queryBank.get(MusicUtil.UPDATE), "critical");
136 return new ReturnType(ResultType.SUCCESS, "update");
139 return new ReturnType(ResultType.FAILURE,
140 "Cannot perform operation since you are the not the lock holder");
143 } catch (Exception e) {
144 StringWriter sw = new StringWriter();
145 e.printStackTrace(new PrintWriter(sw));
146 String exceptionAsString = sw.toString();
147 return new ReturnType(ResultType.FAILURE,
148 "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
149 + exceptionAsString);
154 public static ReturnType update(Map<String,PreparedQueryObject> queryBank, String keyspace, String tableName, String primaryKey,String primaryKeyValue,String planId,String cascadeColumnName,Map<String,String> cascadeColumnValues) {
156 String key = keyspace + "." + tableName + "." + primaryKeyValue;
157 String lockId = MusicCore.createLockReference(key);
158 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
159 ReturnType lockAcqResult = MusicCore.acquireLockWithLease(key, lockId, leasePeriod);
163 if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
164 return updateAtomic(lockId, keyspace, tableName, primaryKey,primaryKeyValue, queryBank,planId,cascadeColumnValues,cascadeColumnName);
167 MusicCore.destroyLockRef(lockId);
168 return lockAcqResult;
171 } catch (Exception e) {
172 MusicCore.destroyLockRef(lockId);
173 return new ReturnType(ResultType.FAILURE, e.getMessage());
178 public static ReturnType updateAtomic(String lockId, String keyspace, String tableName, String primaryKey,String primaryKeyValue,
179 Map<String,PreparedQueryObject> queryBank,String planId,Map<String,String> cascadeColumnValues,String casscadeColumnName) {
182 MusicLockState mls = MusicCore.getLockingServiceHandle()
183 .getLockState(keyspace + "." + tableName + "." + primaryKeyValue);
184 if (mls.getLockHolder().equals(lockId) == true) {
185 Row row = MusicCore.getDSHandle().executeCriticalGet(queryBank.get(MusicUtil.SELECT)).one();
188 Map<String, String> updatedValues = cascadeColumnUpdateSpecific(row, cascadeColumnValues, casscadeColumnName, planId);
189 JSONObject json = new JSONObject(updatedValues);
190 PreparedQueryObject update = new PreparedQueryObject();
191 String vector_ts = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
192 update.appendQueryString("UPDATE " + keyspace + "." + tableName + " SET " + casscadeColumnName + "['" + planId
193 + "'] = ?, vector_ts = ? WHERE " + primaryKey + " = ?");
194 update.addValue(MusicUtil.convertToActualDataType(DataType.text(), json.toString()));
195 update.addValue(MusicUtil.convertToActualDataType(DataType.text(), vector_ts));
196 update.addValue(MusicUtil.convertToActualDataType(DataType.text(), primaryKeyValue));
198 MusicCore.getDSHandle().executePut(update, "critical");
199 } catch (Exception ex) {
200 return new ReturnType(ResultType.FAILURE, ex.getMessage());
203 return new ReturnType(ResultType.FAILURE,"Cannot find data related to key: "+primaryKey);
205 MusicCore.getDSHandle().executePut(queryBank.get(MusicUtil.UPSERT), "critical");
206 return new ReturnType(ResultType.SUCCESS, "update success");
209 return new ReturnType(ResultType.FAILURE,
210 "Cannot perform operation since you are the not the lock holder");
213 } catch (Exception e) {
214 StringWriter sw = new StringWriter();
215 e.printStackTrace(new PrintWriter(sw));
216 String exceptionAsString = sw.toString();
217 return new ReturnType(ResultType.FAILURE,
218 "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
219 + exceptionAsString);
224 @SuppressWarnings("unchecked")
225 public static Map<String, String> getValues(boolean isExists, Map<String, Object> casscadeColumnData,
226 Map<String, String> status) {
228 Map<String, String> value = new HashMap<>();
229 Map<String, String> returnMap = new HashMap<>();
230 Object key = casscadeColumnData.get("key");
231 String setStatus = "";
232 value = (Map<String, String>) casscadeColumnData.get("value");
235 setStatus = status.get("exists");
237 setStatus = status.get("nonexists");
239 value.put("status", setStatus);
240 JSONObject valueJson = new JSONObject(value);
241 returnMap.put(key.toString(), valueJson.toString());
246 public static PreparedQueryObject extractQuery(Map<String, Object> valuesMap, TableMetadata tableInfo, String tableName,
247 String keySpaceName,String primaryKeyName,String primaryKey,String casscadeColumn,Object casscadeColumnValues) throws Exception {
249 PreparedQueryObject queryObject = new PreparedQueryObject();
250 StringBuilder fieldsString = new StringBuilder("(vector_ts"+",");
251 StringBuilder valueString = new StringBuilder("(" + "?" + ",");
252 String vector = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
253 queryObject.addValue(vector);
254 if(casscadeColumn!=null && casscadeColumnValues!=null) {
255 fieldsString.append("" +casscadeColumn+" ," );
256 valueString.append("?,");
257 queryObject.addValue(casscadeColumnValues);
261 for (Map.Entry<String, Object> entry : valuesMap.entrySet()) {
263 fieldsString.append("" + entry.getKey());
264 Object valueObj = entry.getValue();
265 if (primaryKeyName.equals(entry.getKey())) {
266 primaryKey = entry.getValue() + "";
267 primaryKey = primaryKey.replace("'", "''");
269 DataType colType = null;
271 colType = tableInfo.getColumn(entry.getKey()).getType();
272 } catch(NullPointerException ex) {
273 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() +" Invalid column name : "+entry.getKey(), AppMessages.INCORRECTDATA ,ErrorSeverity.CRITICAL, ErrorTypes.DATAERROR);
277 Object formattedValue = null;
279 formattedValue = MusicUtil.convertToActualDataType(colType, valueObj);
280 } catch (Exception e) {
281 logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
284 valueString.append("?");
285 queryObject.addValue(formattedValue);
288 if (counter == valuesMap.size() - 1) {
289 fieldsString.append(")");
290 valueString.append(")");
292 fieldsString.append(",");
293 valueString.append(",");
295 counter = counter + 1;
297 queryObject.appendQueryString("INSERT INTO " + keySpaceName + "." + tableName + " "
298 + fieldsString + " VALUES " + valueString);
302 public static Object getColValue(Row row, String colName, DataType colType) {
303 switch (colType.getName()) {
305 return row.getString(colName);
307 return row.getUUID(colName);
309 return row.getVarint(colName);
311 return row.getLong(colName);
313 return row.getInt(colName);
315 return row.getFloat(colName);
317 return row.getDouble(colName);
319 return row.getBool(colName);
321 return row.getMap(colName, String.class, String.class);
327 @SuppressWarnings("unchecked")
328 public static Map<String, String> cascadeColumnUpdateSpecific(Row row, Map<String, String> changeOfStatus,
329 String cascadeColumnName, String planId) {
331 ColumnDefinitions colInfo = row.getColumnDefinitions();
332 DataType colType = colInfo.getType(cascadeColumnName);
333 Map<String, String> values = new HashMap<>();
334 Object columnValue = getColValue(row, cascadeColumnName, colType);
336 Map<String, String> finalValues = new HashMap<>();
337 values = (Map<String, String>) columnValue;
338 if (values.keySet().contains(planId)) {
339 String valueString = values.get(planId);
340 String tempValueString = valueString.replaceAll("\\{", "").replaceAll("\"", "").replaceAll("\\}", "");
341 String[] elements = tempValueString.split(",");
342 for (String str : elements) {
343 String[] keyValue = str.split(":");
344 if ((changeOfStatus.keySet().contains(keyValue[0].replaceAll("\\s", ""))))
345 keyValue[1] = changeOfStatus.get(keyValue[0].replaceAll("\\s", ""));
346 finalValues.put(keyValue[0], keyValue[1]);