c97ccfabe158dd2cc352d61a89393140dadd3297
[music.git] / src / main / java / org / onap / music / conductor / conditionals / MusicConditional.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  * 
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  * 
19  * ============LICENSE_END=============================================
20  * ====================================================================
21  */
22 package org.onap.music.conductor.conditionals;
23
24 import java.io.PrintWriter;
25 import java.io.StringWriter;
26 import java.util.HashMap;
27 import java.util.Map;
28
29 import org.codehaus.jettison.json.JSONObject;
30 import org.onap.music.datastore.PreparedQueryObject;
31 import org.onap.music.eelf.logging.EELFLoggerDelegate;
32 import org.onap.music.eelf.logging.format.AppMessages;
33 import org.onap.music.eelf.logging.format.ErrorSeverity;
34 import org.onap.music.eelf.logging.format.ErrorTypes;
35 import org.onap.music.lockingservice.MusicLockState;
36 import org.onap.music.main.MusicCore;
37 import org.onap.music.main.MusicUtil;
38 import org.onap.music.main.ResultType;
39 import org.onap.music.main.ReturnType;
40 import org.onap.music.rest.RestMusicDataAPI;
41
42 import com.datastax.driver.core.ColumnDefinitions;
43 import com.datastax.driver.core.DataType;
44 import com.datastax.driver.core.ResultSet;
45 import com.datastax.driver.core.Row;
46 import com.datastax.driver.core.TableMetadata;
47
48 public class MusicConditional {
49         private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(RestMusicDataAPI.class);
50
51         public static ReturnType conditionalInsert(String keyspace, String tablename, String casscadeColumnName,
52                         Map<String, Object> casscadeColumnData, String primaryKey, Map<String, Object> valuesMap,
53                         Map<String, String> status) throws Exception {
54
55                 Map<String, PreparedQueryObject> queryBank = new HashMap<>();
56                 TableMetadata tableInfo = null;
57                 tableInfo = MusicCore.returnColumnMetadata(keyspace, tablename);
58                 DataType primaryIdType = tableInfo.getPrimaryKey().get(0).getType();
59                 String primaryId = tableInfo.getPrimaryKey().get(0).getName();
60                 DataType casscadeColumnType = tableInfo.getColumn(casscadeColumnName).getType();
61                 String vector = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
62
63                 PreparedQueryObject select = new PreparedQueryObject();
64                 select.appendQueryString("SELECT * FROM " + keyspace + "." + tablename + " where " + primaryId + " = ?");
65                 select.addValue(MusicUtil.convertToActualDataType(primaryIdType, primaryKey));
66                 queryBank.put(MusicUtil.SELECT, select);
67
68                 PreparedQueryObject update = new PreparedQueryObject();
69                 Map<String, String> updateColumnvalues = new HashMap<>(); //casscade column values
70                 updateColumnvalues = getValues(true, casscadeColumnData, status);
71                 Object formatedValues = MusicUtil.convertToActualDataType(casscadeColumnType, updateColumnvalues);
72                 update.appendQueryString("UPDATE " + keyspace + "." + tablename + " SET " + casscadeColumnName + " ="
73                                 + casscadeColumnName + " + ? , vector_ts = ?" + " WHERE " + primaryId + " = ? ");
74                 update.addValue(formatedValues);
75                 update.addValue(MusicUtil.convertToActualDataType(DataType.text(), vector));
76                 update.addValue(MusicUtil.convertToActualDataType(primaryIdType, primaryKey));
77                 queryBank.put(MusicUtil.UPDATE, update);
78
79
80                 Map<String, String> insertColumnvalues = new HashMap<>();//casscade column values
81                 insertColumnvalues = getValues(false, casscadeColumnData, status);
82                 formatedValues = MusicUtil.convertToActualDataType(casscadeColumnType, insertColumnvalues);
83                 PreparedQueryObject insert = extractQuery(valuesMap, tableInfo, tablename, keyspace, primaryId, primaryKey,casscadeColumnName,formatedValues);
84                 queryBank.put(MusicUtil.INSERT, insert);
85                 
86                 
87                 String key = keyspace + "." + tablename + "." + primaryKey;
88                 String lockId = MusicCore.createLockReference(key);
89                 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
90                 ReturnType lockAcqResult = MusicCore.acquireLockWithLease(key, lockId, leasePeriod);
91
92                 try {
93                         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
94                                 ReturnType criticalPutResult = conditionalInsertAtomic(lockId, keyspace, tablename, primaryKey,
95                                                 queryBank);
96                                 MusicCore.destroyLockRef(lockId);
97                                 if (criticalPutResult.getMessage().contains("insert"))
98                                         criticalPutResult
99                                                         .setMessage("Insert values: ");
100                                 else if (criticalPutResult.getMessage().contains("update"))
101                                         criticalPutResult
102                                                         .setMessage("Update values: " + updateColumnvalues);
103                                 return criticalPutResult;
104
105                         } else {
106                                 MusicCore.destroyLockRef(lockId);
107                                 return lockAcqResult;
108                         }
109                 } catch (Exception e) {
110                         MusicCore.destroyLockRef(lockId);
111                         return new ReturnType(ResultType.FAILURE, e.getMessage());
112                 }
113
114         }
115
116         public static ReturnType conditionalInsertAtomic(String lockId, String keyspace, String tableName,
117                         String primaryKey, Map<String, PreparedQueryObject> queryBank) {
118
119                 ResultSet results = null;
120
121                 try {
122
123                         MusicLockState mls = MusicCore.getLockingServiceHandle()
124                                         .getLockState(keyspace + "." + tableName + "." + primaryKey);
125                         if (mls.getLockHolder().equals(lockId) == true) {
126                                 try {
127                                         results = MusicCore.getDSHandle().executeCriticalGet(queryBank.get(MusicUtil.SELECT));
128                                 } catch (Exception e) {
129                                         return new ReturnType(ResultType.FAILURE, e.getMessage());
130                                 }
131                                 if (results.all().isEmpty()) {
132                                         MusicCore.getDSHandle().executePut(queryBank.get(MusicUtil.INSERT), "critical");
133                                         return new ReturnType(ResultType.SUCCESS, "insert");
134                                 } else {
135                                         MusicCore.getDSHandle().executePut(queryBank.get(MusicUtil.UPDATE), "critical");
136                                         return new ReturnType(ResultType.SUCCESS, "update");
137                                 }
138                         } else {
139                                 return new ReturnType(ResultType.FAILURE,
140                                                 "Cannot perform operation since you are the not the lock holder");
141                         }
142
143                 } catch (Exception e) {
144                         StringWriter sw = new StringWriter();
145                         e.printStackTrace(new PrintWriter(sw));
146                         String exceptionAsString = sw.toString();
147                         return new ReturnType(ResultType.FAILURE,
148                                         "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
149                                                         + exceptionAsString);
150                 }
151
152         }
153
154         public static ReturnType update(Map<String,PreparedQueryObject> queryBank, String keyspace, String tableName, String primaryKey,String primaryKeyValue,String planId,String cascadeColumnName,Map<String,String> cascadeColumnValues) {
155
156                 String key = keyspace + "." + tableName + "." + primaryKeyValue;
157                 String lockId = MusicCore.createLockReference(key);
158                 long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
159                 ReturnType lockAcqResult = MusicCore.acquireLockWithLease(key, lockId, leasePeriod);
160
161                 try {
162
163                         if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
164                                 return updateAtomic(lockId, keyspace, tableName, primaryKey,primaryKeyValue, queryBank,planId,cascadeColumnValues,cascadeColumnName);
165
166                         } else {
167                                 MusicCore.destroyLockRef(lockId);
168                                 return lockAcqResult;
169                         }
170
171                 } catch (Exception e) {
172                         MusicCore.destroyLockRef(lockId);
173                         return new ReturnType(ResultType.FAILURE, e.getMessage());
174
175                 }
176         }
177
178         public static ReturnType updateAtomic(String lockId, String keyspace, String tableName, String primaryKey,String primaryKeyValue,
179                         Map<String,PreparedQueryObject> queryBank,String planId,Map<String,String> cascadeColumnValues,String casscadeColumnName) {
180                 try {
181
182                         MusicLockState mls = MusicCore.getLockingServiceHandle()
183                                         .getLockState(keyspace + "." + tableName + "." + primaryKeyValue);
184                         if (mls.getLockHolder().equals(lockId) == true) {
185                                 Row row  = MusicCore.getDSHandle().executeCriticalGet(queryBank.get(MusicUtil.SELECT)).one();
186                                 
187                                 if(row != null) {
188                                         Map<String, String> updatedValues = cascadeColumnUpdateSpecific(row, cascadeColumnValues, casscadeColumnName, planId);
189                                         JSONObject json = new JSONObject(updatedValues);
190                                         PreparedQueryObject update = new PreparedQueryObject();
191                                         String vector_ts = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
192                                         update.appendQueryString("UPDATE " + keyspace + "." + tableName + " SET " + casscadeColumnName + "['" + planId
193                                                         + "'] = ?, vector_ts = ? WHERE " + primaryKey + " = ?");
194                                         update.addValue(MusicUtil.convertToActualDataType(DataType.text(), json.toString()));
195                                         update.addValue(MusicUtil.convertToActualDataType(DataType.text(), vector_ts));
196                                         update.addValue(MusicUtil.convertToActualDataType(DataType.text(), primaryKeyValue));
197                                         try {
198                                                 MusicCore.getDSHandle().executePut(update, "critical");
199                                         } catch (Exception ex) {
200                                                 return new ReturnType(ResultType.FAILURE, ex.getMessage());
201                                         }
202                                 }else {
203                                         return new ReturnType(ResultType.FAILURE,"Cannot find data related to key: "+primaryKey);
204                                 }
205                                 MusicCore.getDSHandle().executePut(queryBank.get(MusicUtil.UPSERT), "critical");
206                                 return new ReturnType(ResultType.SUCCESS, "update success");
207
208                         } else {
209                                 return new ReturnType(ResultType.FAILURE,
210                                                 "Cannot perform operation since you are the not the lock holder");
211                         }
212
213                 } catch (Exception e) {
214                         StringWriter sw = new StringWriter();
215                         e.printStackTrace(new PrintWriter(sw));
216                         String exceptionAsString = sw.toString();
217                         return new ReturnType(ResultType.FAILURE,
218                                         "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
219                                                         + exceptionAsString);
220                 }
221
222         }
223
224         @SuppressWarnings("unchecked")
225         public static Map<String, String> getValues(boolean isExists, Map<String, Object> casscadeColumnData,
226                         Map<String, String> status) {
227
228                 Map<String, String> value = new HashMap<>();
229                 Map<String, String> returnMap = new HashMap<>();
230                 Object key = casscadeColumnData.get("key");
231                 String setStatus = "";
232                 value = (Map<String, String>) casscadeColumnData.get("value");
233
234                 if (isExists)
235                         setStatus = status.get("exists");
236                 else
237                         setStatus = status.get("nonexists");
238
239                 value.put("status", setStatus);
240                 JSONObject valueJson = new JSONObject(value);
241                 returnMap.put(key.toString(), valueJson.toString());
242                 return returnMap;
243
244         }
245         
246         public static PreparedQueryObject extractQuery(Map<String, Object> valuesMap, TableMetadata tableInfo, String tableName,
247                         String keySpaceName,String primaryKeyName,String primaryKey,String casscadeColumn,Object casscadeColumnValues) throws Exception {
248
249                 PreparedQueryObject queryObject = new PreparedQueryObject();
250                 StringBuilder fieldsString = new StringBuilder("(vector_ts"+",");
251                 StringBuilder valueString = new StringBuilder("(" + "?" + ",");
252                 String vector = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
253                 queryObject.addValue(vector);
254                 if(casscadeColumn!=null && casscadeColumnValues!=null) {
255                         fieldsString.append("" +casscadeColumn+" ," );
256                   valueString.append("?,");
257                   queryObject.addValue(casscadeColumnValues);
258                 }
259                 
260                 int counter = 0;
261                 for (Map.Entry<String, Object> entry : valuesMap.entrySet()) {
262             
263                         fieldsString.append("" + entry.getKey());
264             Object valueObj = entry.getValue();
265             if (primaryKeyName.equals(entry.getKey())) {
266                 primaryKey = entry.getValue() + "";
267                 primaryKey = primaryKey.replace("'", "''");
268             }
269             DataType colType = null;
270             try {
271                 colType = tableInfo.getColumn(entry.getKey()).getType();
272             } catch(NullPointerException ex) {
273                 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() +" Invalid column name : "+entry.getKey(), AppMessages.INCORRECTDATA  ,ErrorSeverity.CRITICAL, ErrorTypes.DATAERROR);
274                
275             }
276
277             Object formattedValue = null;
278             try {
279               formattedValue = MusicUtil.convertToActualDataType(colType, valueObj);
280             } catch (Exception e) {
281               logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
282           }
283             
284                         valueString.append("?");
285             queryObject.addValue(formattedValue);
286
287             
288                         if (counter == valuesMap.size() - 1) {
289                 fieldsString.append(")");
290                 valueString.append(")");
291             } else {
292                 fieldsString.append(",");
293                 valueString.append(",");
294             }
295             counter = counter + 1;
296         }
297         queryObject.appendQueryString("INSERT INTO " + keySpaceName + "." + tableName + " "
298                 + fieldsString + " VALUES " + valueString);
299                 return queryObject;
300         }
301         
302         public static Object getColValue(Row row, String colName, DataType colType) {
303                 switch (colType.getName()) {
304                 case VARCHAR:
305                         return row.getString(colName);
306                 case UUID:
307                         return row.getUUID(colName);
308                 case VARINT:
309                         return row.getVarint(colName);
310                 case BIGINT:
311                         return row.getLong(colName);
312                 case INT:
313                         return row.getInt(colName);
314                 case FLOAT:
315                         return row.getFloat(colName);
316                 case DOUBLE:
317                         return row.getDouble(colName);
318                 case BOOLEAN:
319                         return row.getBool(colName);
320                 case MAP:
321                         return row.getMap(colName, String.class, String.class);
322                 default:
323                         return null;
324                 }
325         }
326         
327         @SuppressWarnings("unchecked")
328         public static Map<String, String> cascadeColumnUpdateSpecific(Row row, Map<String, String> changeOfStatus,
329                         String cascadeColumnName, String planId) {
330
331                 ColumnDefinitions colInfo = row.getColumnDefinitions();
332                 DataType colType = colInfo.getType(cascadeColumnName);
333                 Map<String, String> values = new HashMap<>();
334                 Object columnValue = getColValue(row, cascadeColumnName, colType);
335
336                 Map<String, String> finalValues = new HashMap<>();
337                 values = (Map<String, String>) columnValue;
338                 if (values != null && values.keySet().contains(planId)) {
339                         String valueString = values.get(planId);
340                         String tempValueString = valueString.replaceAll("\\{", "").replaceAll("\"", "").replaceAll("\\}", "");
341                         String[] elements = tempValueString.split(",");
342                         for (String str : elements) {
343                                 String[] keyValue = str.split(":");
344                                 if ((changeOfStatus.keySet().contains(keyValue[0].replaceAll("\\s", ""))))
345                                 keyValue[1] = changeOfStatus.get(keyValue[0].replaceAll("\\s", ""));
346                                 finalValues.put(keyValue[0], keyValue[1]);
347                         }
348                 }
349                 return finalValues;
350
351         }
352
353 }