Push variuos changes
[music.git] / src / main / java / org / onap / music / conductor / conditionals / MusicConditional.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  * 
11  *     http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  *  Unless required by applicable law or agreed to in writing, software
14  *  distributed under the License is distributed on an "AS IS" BASIS,
15  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  *  See the License for the specific language governing permissions and
17  *  limitations under the License.
18  * 
19  * ============LICENSE_END=============================================
20  * ====================================================================
21  */
22
23 package org.onap.music.conductor.conditionals;
24
25 import java.io.PrintWriter;
26 import java.io.StringWriter;
27 import java.util.HashMap;
28 import java.util.Map;
29
30 import org.codehaus.jettison.json.JSONObject;
31 import org.onap.music.datastore.MusicDataStoreHandle;
32 import org.onap.music.datastore.PreparedQueryObject;
33 import org.onap.music.eelf.logging.EELFLoggerDelegate;
34 import org.onap.music.eelf.logging.format.AppMessages;
35 import org.onap.music.eelf.logging.format.ErrorSeverity;
36 import org.onap.music.eelf.logging.format.ErrorTypes;
37 import org.onap.music.exceptions.MusicLockingException;
38 import org.onap.music.exceptions.MusicQueryException;
39 import org.onap.music.exceptions.MusicServiceException;
40 import org.onap.music.lockingservice.cassandra.MusicLockState;
41 import org.onap.music.main.MusicCore;
42 import org.onap.music.main.MusicUtil;
43 import org.onap.music.main.ResultType;
44 import org.onap.music.main.ReturnType;
45 import org.onap.music.rest.RestMusicDataAPI;
46 import org.onap.music.service.impl.MusicZKCore;
47
48 import com.datastax.driver.core.ColumnDefinitions;
49 import com.datastax.driver.core.DataType;
50 import com.datastax.driver.core.ResultSet;
51 import com.datastax.driver.core.Row;
52 import com.datastax.driver.core.TableMetadata;
53
54 public class MusicConditional {
55     private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(RestMusicDataAPI.class);
56
57     public static ReturnType conditionalInsert(String keyspace, String tablename, String casscadeColumnName,
58             Map<String, Object> casscadeColumnData, String primaryKey, Map<String, Object> valuesMap,
59             Map<String, String> status) throws Exception {
60
61         Map<String, PreparedQueryObject> queryBank = new HashMap<>();
62         TableMetadata tableInfo = null;
63         tableInfo = MusicDataStoreHandle.returnColumnMetadata(keyspace, tablename);
64         DataType primaryIdType = tableInfo.getPrimaryKey().get(0).getType();
65         String primaryId = tableInfo.getPrimaryKey().get(0).getName();
66         DataType casscadeColumnType = tableInfo.getColumn(casscadeColumnName).getType();
67         String vector = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
68
69         PreparedQueryObject select = new PreparedQueryObject();
70         select.appendQueryString("SELECT * FROM " + keyspace + "." + tablename + " where " + primaryId + " = ?");
71         select.addValue(MusicUtil.convertToActualDataType(primaryIdType, primaryKey));
72         queryBank.put(MusicUtil.SELECT, select);
73
74         PreparedQueryObject update = new PreparedQueryObject();
75         Map<String, String> updateColumnvalues = new HashMap<>(); //casscade column values
76         updateColumnvalues = getValues(true, casscadeColumnData, status);
77         Object formatedValues = MusicUtil.convertToActualDataType(casscadeColumnType, updateColumnvalues);
78         update.appendQueryString("UPDATE " + keyspace + "." + tablename + " SET " + casscadeColumnName + " ="
79                 + casscadeColumnName + " + ? , vector_ts = ?" + " WHERE " + primaryId + " = ? ");
80         update.addValue(formatedValues);
81         update.addValue(MusicUtil.convertToActualDataType(DataType.text(), vector));
82         update.addValue(MusicUtil.convertToActualDataType(primaryIdType, primaryKey));
83         queryBank.put(MusicUtil.UPDATE, update);
84
85
86         Map<String, String> insertColumnvalues = new HashMap<>();//casscade column values
87         insertColumnvalues = getValues(false, casscadeColumnData, status);
88         formatedValues = MusicUtil.convertToActualDataType(casscadeColumnType, insertColumnvalues);
89         PreparedQueryObject insert = extractQuery(valuesMap, tableInfo, tablename, keyspace, primaryId, primaryKey,casscadeColumnName,formatedValues);
90         queryBank.put(MusicUtil.INSERT, insert);
91         
92         
93         String key = keyspace + "." + tablename + "." + primaryKey;
94         String lockId = MusicCore.createLockReference(key);
95         long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
96         ReturnType lockAcqResult = MusicCore.acquireLockWithLease(key, lockId, leasePeriod);
97
98         try {
99             if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
100                 ReturnType criticalPutResult = conditionalInsertAtomic(lockId, keyspace, tablename, primaryKey,
101                         queryBank);
102                 MusicCore.destroyLockRef(lockId);
103                 if (criticalPutResult.getMessage().contains("insert"))
104                     criticalPutResult
105                             .setMessage("Insert values: ");
106                 else if (criticalPutResult.getMessage().contains("update"))
107                     criticalPutResult
108                             .setMessage("Update values: " + updateColumnvalues);
109                 return criticalPutResult;
110
111             } else {
112                 MusicCore.destroyLockRef(lockId);
113                 return lockAcqResult;
114             }
115         } catch (Exception e) {
116             MusicCore.destroyLockRef(lockId);
117             return new ReturnType(ResultType.FAILURE, e.getMessage());
118         }
119
120     }
121
122     public static ReturnType conditionalInsertAtomic(String lockId, String keyspace, String tableName,
123             String primaryKey, Map<String, PreparedQueryObject> queryBank) {
124
125         ResultSet results = null;
126
127         try {
128             String fullyQualifiedKey = keyspace + "." + tableName + "." + primaryKey;
129             ReturnType lockAcqResult = MusicCore.acquireLock(fullyQualifiedKey, lockId);
130             //MusicLockState mls = MusicZKCore.getLockingServiceHandle()
131                     //.getLockState(keyspace + "." + tableName + "." + primaryKey);
132             if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
133                 try {
134                     results = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryBank.get(MusicUtil.SELECT));
135                 } catch (Exception e) {
136                     return new ReturnType(ResultType.FAILURE, e.getMessage());
137                 }
138                 if (results.all().isEmpty()) {
139                     MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.INSERT), "critical");
140                     return new ReturnType(ResultType.SUCCESS, "insert");
141                 } else {
142                     MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.UPDATE), "critical");
143                     return new ReturnType(ResultType.SUCCESS, "update");
144                 }
145             } else {
146                 return new ReturnType(ResultType.FAILURE,
147                         "Cannot perform operation since you are the not the lock holder");
148             }
149
150         } catch (Exception e) {
151             StringWriter sw = new StringWriter();
152             e.printStackTrace(new PrintWriter(sw));
153             String exceptionAsString = sw.toString();
154             return new ReturnType(ResultType.FAILURE,
155                     "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
156                             + exceptionAsString);
157         }
158
159     }
160
161     public static ReturnType update(Map<String,PreparedQueryObject> queryBank, String keyspace, String tableName, String primaryKey,String primaryKeyValue,String planId,String cascadeColumnName,Map<String,String> cascadeColumnValues) throws MusicLockingException, MusicQueryException, MusicServiceException {
162
163         String key = keyspace + "." + tableName + "." + primaryKeyValue;
164         String lockId = MusicCore.createLockReference(key);
165         long leasePeriod = MusicUtil.getDefaultLockLeasePeriod();
166         ReturnType lockAcqResult = MusicCore.acquireLockWithLease(key, lockId, leasePeriod);
167
168         try {
169
170             if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
171                 ReturnType criticalPutResult = updateAtomic(lockId, keyspace, tableName, primaryKey,primaryKeyValue, queryBank,planId,cascadeColumnValues,cascadeColumnName);
172                 MusicCore.destroyLockRef(lockId);
173                 return criticalPutResult;
174             } else {
175                 MusicCore.destroyLockRef(lockId);
176                 return lockAcqResult;
177             }
178
179         } catch (Exception e) {
180             MusicCore.destroyLockRef(lockId);
181             return new ReturnType(ResultType.FAILURE, e.getMessage());
182
183         }
184     }
185
186     public static ReturnType updateAtomic(String lockId, String keyspace, String tableName, String primaryKey,String primaryKeyValue,
187             Map<String,PreparedQueryObject> queryBank,String planId,Map<String,String> cascadeColumnValues,String casscadeColumnName) {
188         try {
189             String fullyQualifiedKey = keyspace + "." + tableName + "." + primaryKeyValue;
190             ReturnType lockAcqResult = MusicCore.acquireLock(fullyQualifiedKey, lockId);
191             //MusicLockState mls = MusicZKCore.getLockingServiceHandle()
192                     //.getLockState(keyspace + "." + tableName + "." + primaryKeyValue);
193             if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
194                 Row row  = MusicDataStoreHandle.getDSHandle().executeQuorumConsistencyGet(queryBank.get(MusicUtil.SELECT)).one();
195                 
196                 if(row != null) {
197                     Map<String, String> updatedValues = cascadeColumnUpdateSpecific(row, cascadeColumnValues, casscadeColumnName, planId);
198                     JSONObject json = new JSONObject(updatedValues);
199                     PreparedQueryObject update = new PreparedQueryObject();
200                     String vector_ts = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
201                     update.appendQueryString("UPDATE " + keyspace + "." + tableName + " SET " + casscadeColumnName + "['" + planId
202                             + "'] = ?, vector_ts = ? WHERE " + primaryKey + " = ?");
203                     update.addValue(MusicUtil.convertToActualDataType(DataType.text(), json.toString()));
204                     update.addValue(MusicUtil.convertToActualDataType(DataType.text(), vector_ts));
205                     update.addValue(MusicUtil.convertToActualDataType(DataType.text(), primaryKeyValue));
206                     try {
207                         MusicDataStoreHandle.getDSHandle().executePut(update, "critical");
208                     } catch (Exception ex) {
209                         return new ReturnType(ResultType.FAILURE, ex.getMessage());
210                     }
211                 }else {
212                     return new ReturnType(ResultType.FAILURE,"Cannot find data related to key: "+primaryKey);
213                 }
214                 MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.UPSERT), "critical");
215                 return new ReturnType(ResultType.SUCCESS, "update success");
216
217             } else {
218                 return new ReturnType(ResultType.FAILURE,
219                         "Cannot perform operation since you are the not the lock holder");
220             }
221
222         } catch (Exception e) {
223             StringWriter sw = new StringWriter();
224             e.printStackTrace(new PrintWriter(sw));
225             String exceptionAsString = sw.toString();
226             return new ReturnType(ResultType.FAILURE,
227                     "Exception thrown while doing the critical put, check sanctity of the row/conditions:\n"
228                             + exceptionAsString);
229         }
230
231     }
232
233     @SuppressWarnings("unchecked")
234     public static Map<String, String> getValues(boolean isExists, Map<String, Object> casscadeColumnData,
235             Map<String, String> status) {
236
237         Map<String, String> value = new HashMap<>();
238         Map<String, String> returnMap = new HashMap<>();
239         Object key = casscadeColumnData.get("key");
240         String setStatus = "";
241         value = (Map<String, String>) casscadeColumnData.get("value");
242
243         if (isExists)
244             setStatus = status.get("exists");
245         else
246             setStatus = status.get("nonexists");
247
248         value.put("status", setStatus);
249         JSONObject valueJson = new JSONObject(value);
250         returnMap.put(key.toString(), valueJson.toString());
251         return returnMap;
252
253     }
254     
255     public static PreparedQueryObject extractQuery(Map<String, Object> valuesMap, TableMetadata tableInfo, String tableName,
256             String keySpaceName,String primaryKeyName,String primaryKey,String casscadeColumn,Object casscadeColumnValues) throws Exception {
257
258         PreparedQueryObject queryObject = new PreparedQueryObject();
259         StringBuilder fieldsString = new StringBuilder("(vector_ts"+",");
260         StringBuilder valueString = new StringBuilder("(" + "?" + ",");
261         String vector = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
262         queryObject.addValue(vector);
263         if(casscadeColumn!=null && casscadeColumnValues!=null) {
264             fieldsString.append("" +casscadeColumn+" ," );
265           valueString.append("?,");
266           queryObject.addValue(casscadeColumnValues);
267         }
268         
269         int counter = 0;
270         for (Map.Entry<String, Object> entry : valuesMap.entrySet()) {
271             
272             fieldsString.append("" + entry.getKey());
273             Object valueObj = entry.getValue();
274             if (primaryKeyName.equals(entry.getKey())) {
275                 primaryKey = entry.getValue() + "";
276                 primaryKey = primaryKey.replace("'", "''");
277             }
278             DataType colType = null;
279             try {
280                 colType = tableInfo.getColumn(entry.getKey()).getType();
281             } catch(NullPointerException ex) {
282                 logger.error(EELFLoggerDelegate.errorLogger,ex.getMessage() +" Invalid column name : "+entry.getKey(), AppMessages.INCORRECTDATA  ,ErrorSeverity.CRITICAL, ErrorTypes.DATAERROR);
283                
284             }
285
286             Object formattedValue = null;
287             try {
288               formattedValue = MusicUtil.convertToActualDataType(colType, valueObj);
289             } catch (Exception e) {
290               logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
291           }
292             
293             valueString.append("?");
294             queryObject.addValue(formattedValue);
295
296             
297             if (counter == valuesMap.size() - 1) {
298                 fieldsString.append(")");
299                 valueString.append(")");
300             } else {
301                 fieldsString.append(",");
302                 valueString.append(",");
303             }
304             counter = counter + 1;
305         }
306         queryObject.appendQueryString("INSERT INTO " + keySpaceName + "." + tableName + " "
307                 + fieldsString + " VALUES " + valueString);
308         return queryObject;
309     }
310     
311     public static Object getColValue(Row row, String colName, DataType colType) {
312         switch (colType.getName()) {
313         case VARCHAR:
314             return row.getString(colName);
315         case UUID:
316             return row.getUUID(colName);
317         case VARINT:
318             return row.getVarint(colName);
319         case BIGINT:
320             return row.getLong(colName);
321         case INT:
322             return row.getInt(colName);
323         case FLOAT:
324             return row.getFloat(colName);
325         case DOUBLE:
326             return row.getDouble(colName);
327         case BOOLEAN:
328             return row.getBool(colName);
329         case MAP:
330             return row.getMap(colName, String.class, String.class);
331         default:
332             return null;
333         }
334     }
335     
336     @SuppressWarnings("unchecked")
337     public static Map<String, String> cascadeColumnUpdateSpecific(Row row, Map<String, String> changeOfStatus,
338             String cascadeColumnName, String planId) {
339
340         ColumnDefinitions colInfo = row.getColumnDefinitions();
341         DataType colType = colInfo.getType(cascadeColumnName);
342         Map<String, String> values = new HashMap<>();
343         Object columnValue = getColValue(row, cascadeColumnName, colType);
344
345         Map<String, String> finalValues = new HashMap<>();
346         values = (Map<String, String>) columnValue;
347         if (values != null && values.keySet().contains(planId)) {
348             String valueString = values.get(planId);
349             String tempValueString = valueString.replaceAll("\\{", "").replaceAll("\"", "").replaceAll("\\}", "");
350             String[] elements = tempValueString.split(",");
351             for (String str : elements) {
352                 String[] keyValue = str.split(":");
353                 if ((changeOfStatus.keySet().contains(keyValue[0].replaceAll("\\s", ""))))
354                 keyValue[1] = changeOfStatus.get(keyValue[0].replaceAll("\\s", ""));
355                 finalValues.put(keyValue[0], keyValue[1]);
356             }
357         }
358         return finalValues;
359
360     }
361
362 }