Update locking to use Threadsafe set
[music.git] / music-core / src / main / java / org / onap / music / datastore / MusicDataStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Modifications Copyright (c) 2018-2019 IBM
8  *  Modifications Copyright (c) 2019 Samsung
9  * ===================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *
14  *     http://www.apache.org/licenses/LICENSE-2.0
15  *
16  *  Unless required by applicable law or agreed to in writing, software
17  *  distributed under the License is distributed on an "AS IS" BASIS,
18  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  *  See the License for the specific language governing permissions and
20  *  limitations under the License.
21  *
22  * ============LICENSE_END=============================================
23  * ====================================================================
24  */
25
26 package org.onap.music.datastore;
27
28 import java.nio.ByteBuffer;
29 import java.util.HashMap;
30 import java.util.Map;
31
32 import org.onap.music.eelf.logging.EELFLoggerDelegate;
33 import org.onap.music.eelf.logging.format.AppMessages;
34 import org.onap.music.eelf.logging.format.ErrorSeverity;
35 import org.onap.music.eelf.logging.format.ErrorTypes;
36 import org.onap.music.exceptions.MusicQueryException;
37 import org.onap.music.exceptions.MusicServiceException;
38 import org.onap.music.lockingservice.cassandra.LockType;
39 import org.onap.music.main.CipherUtil;
40 import org.onap.music.main.MusicUtil;
41 import com.datastax.driver.core.Cluster;
42 import com.datastax.driver.core.ColumnDefinitions;
43 import com.datastax.driver.core.ColumnDefinitions.Definition;
44 import com.datastax.driver.core.ConsistencyLevel;
45 import com.datastax.driver.core.DataType;
46 import com.datastax.driver.core.HostDistance;
47 import com.datastax.driver.core.KeyspaceMetadata;
48 import com.datastax.driver.core.Metadata;
49 import com.datastax.driver.core.PoolingOptions;
50 import com.datastax.driver.core.ResultSet;
51 import com.datastax.driver.core.Row;
52 import com.datastax.driver.core.Session;
53 import com.datastax.driver.core.SimpleStatement;
54 import com.datastax.driver.core.TableMetadata;
55 import com.datastax.driver.core.exceptions.AlreadyExistsException;
56 import com.datastax.driver.core.exceptions.InvalidQueryException;
57 import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
58
59 /**
60  * @author nelson24
61  *
62  */
63 public class MusicDataStore {
64
65     public static final String CONSISTENCY_LEVEL_ONE = "ONE";
66     public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
67     public static final String CONSISTENCY_LEVEL_LOCAL_QUORUM = "LOCAL_QUORUM";
68     private Session session;
69     private Cluster cluster;
70
71
72     /**
73      * Connect to default Cassandra address
74      */
75     public MusicDataStore() {
76         try {
77             connectToCassaCluster(MusicUtil.getMyCassaHost());
78         } catch (MusicServiceException e) {
79             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
80         }
81     }
82
83
84     /**
85      * @param cluster
86      * @param session
87      */
88     public MusicDataStore(Cluster cluster, Session session) {
89         this.session = session;
90         setCluster(cluster);
91     }
92
93     
94     /**
95      * @param session
96      */
97     public void setSession(Session session) {
98         this.session = session;
99     }
100
101     /**
102      * @param session
103      */
104     public Session getSession() {
105         return session;
106     }
107
108     /**
109      * @param cluster
110      */
111     public void setCluster(Cluster cluster) {
112         EnumNameCodec<LockType> lockTypeCodec = new EnumNameCodec<LockType>(LockType.class);
113         cluster.getConfiguration().getCodecRegistry().register(lockTypeCodec);
114         
115         this.cluster = cluster;
116     }
117     
118     public Cluster getCluster() {
119         return this.cluster;
120     }
121
122
123     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
124
125     
126     /**
127      *
128      * @param remoteIp
129      * @throws MusicServiceException
130      */
131     public MusicDataStore(String remoteIp) {
132         try {
133             connectToCassaCluster(remoteIp);
134         } catch (MusicServiceException e) {
135             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
136         }
137     }
138
139     /**
140      *
141      */
142     public void close() {
143         session.close();
144     }
145
146     /**
147      * This method connects to cassandra cluster on specific address.
148      *
149      * @param address
150      */
151     private void connectToCassaCluster(String address) throws MusicServiceException {
152         String[] addresses = null;
153         addresses = address.split(",");
154         PoolingOptions poolingOptions = new PoolingOptions();
155         poolingOptions
156         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
157         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
158         
159         Cluster cluster;
160         if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
161             String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd());
162             logger.info(EELFLoggerDelegate.applicationLogger,
163                     "Building with credentials "+MusicUtil.getCassName()+" & "+ MusicUtil.getCassPwd());
164             cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
165                         .withCredentials(MusicUtil.getCassName(), cassPwd)
166                         //.withLoadBalancingPolicy(new RoundRobinPolicy())
167                         .withoutJMXReporting()
168                         .withPoolingOptions(poolingOptions)
169                         .addContactPoints(addresses).build();
170         } else {
171             cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
172                         .withoutJMXReporting()
173                         .withPoolingOptions(poolingOptions)
174                         .addContactPoints(addresses)
175                         .build();
176         }
177         
178         this.setCluster(cluster);
179         Metadata metadata = this.cluster.getMetadata();
180         logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
181                         + metadata.getClusterName() + " at " + address);
182
183         try {
184             session = this.cluster.connect();
185         } catch (Exception ex) {
186             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY,
187                 ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE, ex);
188             throw new MusicServiceException(
189                             "Error while connecting to Cassandra cluster.. " + ex.getMessage());
190         }
191     }
192
193     /**
194      *
195      * @param keyspace
196      * @param tableName
197      * @param columnName
198      * @return DataType
199      */
200     public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
201         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
202         TableMetadata table = ks.getTable(tableName);
203         return table.getColumn(columnName).getType();
204
205     }
206
207     /**
208      *
209      * @param keyspace
210      * @param tableName
211      * @return TableMetadata
212      */
213     public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
214         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
215         return ks.getTable(tableName);
216     }
217     
218     /**
219     *
220     * @param keyspace
221     * @param tableName
222     * @return TableMetadata
223     */
224    public KeyspaceMetadata returnKeyspaceMetadata(String keyspace) {
225        return cluster.getMetadata().getKeyspace(keyspace);
226    }
227
228
229     /**
230      * Utility function to return the Java specific object type.
231      *
232      * @param row
233      * @param colName
234      * @param colType
235      * @return
236      */
237     public Object getColValue(Row row, String colName, DataType colType) {
238
239         switch (colType.getName()) {
240             case VARCHAR:
241                 return row.getString(colName);
242             case UUID:
243                 return row.getUUID(colName);
244             case VARINT:
245                 return row.getVarint(colName);
246             case BIGINT:
247                 return row.getLong(colName);
248             case INT:
249                 return row.getInt(colName);
250             case FLOAT:
251                 return row.getFloat(colName);
252             case DOUBLE:
253                 return row.getDouble(colName);
254             case BOOLEAN:
255                 return row.getBool(colName);
256             case MAP:
257                 return row.getMap(colName, String.class, String.class);
258             case LIST:
259                 return row.getList(colName, String.class);
260             default:
261                 return null;
262         }
263     }
264
265     public byte[] getBlobValue(Row row, String colName, DataType colType) {
266         ByteBuffer bb = row.getBytes(colName);
267         return bb.array();
268     }
269
270     public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
271         ColumnDefinitions colInfo = row.getColumnDefinitions();
272
273         for (Map.Entry<String, Object> entry : condition.entrySet()) {
274             String colName = entry.getKey();
275             DataType colType = colInfo.getType(colName);
276             Object columnValue = getColValue(row, colName, colType);
277             Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
278             if (columnValue.equals(conditionValue) == false)
279                 return false;
280         }
281         return true;
282     }
283
284     /**
285      * Utility function to store ResultSet values in to a MAP for output.
286      *
287      * @param results
288      * @return MAP
289      */
290     public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
291         Map<String, HashMap<String, Object>> resultMap =
292                         new HashMap<>();
293         int counter = 0;
294         for (Row row : results) {
295             ColumnDefinitions colInfo = row.getColumnDefinitions();
296             HashMap<String, Object> resultOutput = new HashMap<>();
297             for (Definition definition : colInfo) {
298                 if (!(("vector_ts").equals(definition.getName()))) {
299                     if(definition.getType().toString().toLowerCase().contains("blob")) {
300                         resultOutput.put(definition.getName(),
301                                 getBlobValue(row, definition.getName(), definition.getType()));
302                     } else {
303                         resultOutput.put(definition.getName(),
304                                     getColValue(row, definition.getName(), definition.getType()));
305                     }
306                 }
307             }
308             resultMap.put("row " + counter, resultOutput);
309             counter++;
310         }
311         return resultMap;
312     }
313
314
315     // Prepared Statements 1802 additions
316     
317     public boolean executePut(PreparedQueryObject queryObject, String consistency)
318             throws MusicServiceException, MusicQueryException {
319         return executePut(queryObject, consistency, 0);
320     }
321     /**
322      * This Method performs DDL and DML operations on Cassandra using specified consistency level
323      *
324      * @param queryObject Object containing cassandra prepared query and values.
325      * @param consistency Specify consistency level for data synchronization across cassandra
326      *        replicas
327      * @return Boolean Indicates operation success or failure
328      * @throws MusicServiceException
329      * @throws MusicQueryException
330      */
331     public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot)
332                     throws MusicServiceException, MusicQueryException {
333
334         boolean result = false;
335         long timeOfWrite = System.currentTimeMillis();
336         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
337             logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
338             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
339                             + queryObject.getQuery() + "]");
340         }
341         logger.debug(EELFLoggerDelegate.applicationLogger,
342                         "In preprared Execute Put: the actual insert query:"
343                                         + queryObject.getQuery() + "; the values"
344                                         + queryObject.getValues());
345         SimpleStatement preparedInsert = null;
346
347         try {
348             preparedInsert = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
349             if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
350                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
351                 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
352             } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
353                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
354                 if(queryObject.getConsistency() == null)
355                     preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
356                 else
357                     preparedInsert.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
358             } else if (consistency.equalsIgnoreCase(MusicUtil.ONE)) {
359                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
360             }  else if (consistency.equalsIgnoreCase(MusicUtil.QUORUM)) {
361                 preparedInsert.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
362             } else if (consistency.equalsIgnoreCase(MusicUtil.ALL)) {
363                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ALL);
364             }
365             long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite);
366             preparedInsert.setDefaultTimestamp(timestamp);
367
368             ResultSet rs = session.execute(preparedInsert);
369             result = rs.wasApplied();
370
371         }
372         catch (AlreadyExistsException ae) {
373             // logger.error(EELFLoggerDelegate.errorLogger,"AlreadExistsException: " + ae.getMessage(),AppMessages.QUERYERROR,
374             // ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
375             throw new MusicQueryException("AlreadyExistsException: " + ae.getMessage(),ae);
376         } catch ( InvalidQueryException e ) {
377             // logger.error(EELFLoggerDelegate.errorLogger,"InvalidQueryException: " + e.getMessage(),AppMessages.SESSIONFAILED + " [" 
378             // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
379             throw new MusicQueryException("InvalidQueryException: " + e.getMessage(),e);
380         } catch (Exception e) {
381             // logger.error(EELFLoggerDelegate.errorLogger,e.getClass().toString() + ":" + e.getMessage(),AppMessages.SESSIONFAILED + " [" 
382             //     + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e);
383             throw new MusicServiceException("Executing Session Failure for Request = " + "["
384                 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage(),e);
385         }
386         return result;
387     }
388
389  /*   *//**
390      * This method performs DDL operations on Cassandra using consistency level ONE.
391      *
392      * @param queryObject Object containing cassandra prepared query and values.
393      * @return ResultSet
394      * @throws MusicServiceException
395      * @throws MusicQueryException
396      *//*
397     public ResultSet executeEventualGet(PreparedQueryObject queryObject)
398                     throws MusicServiceException, MusicQueryException {
399         CacheAccess<String, PreparedStatement> queryBank = CachingUtil.getStatementBank();
400         PreparedStatement preparedEventualGet = null;
401         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
402             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
403             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
404                             + queryObject.getQuery() + "]");
405         }
406         logger.info(EELFLoggerDelegate.applicationLogger,
407                         "Executing Eventual  get query:" + queryObject.getQuery());
408
409         ResultSet results = null;
410         try {
411             if(queryBank.get(queryObject.getQuery()) != null )
412                 preparedEventualGet=queryBank.get(queryObject.getQuery());
413             else {
414                 preparedEventualGet = session.prepare(queryObject.getQuery());
415                 CachingUtil.updateStatementBank(queryObject.getQuery(), preparedEventualGet);
416             }
417             if(queryObject.getConsistency() == null) {
418                 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
419             } else {
420                 preparedEventualGet.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
421             }
422             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
423
424         } catch (Exception ex) {
425             logger.error("Exception", ex);
426             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
427             throw new MusicServiceException(ex.getMessage());
428         }
429         return results;
430     }
431
432     *//**
433      *
434      * This method performs DDL operation on Cassandra using consistency level QUORUM.
435      *
436      * @param queryObject Object containing cassandra prepared query and values.
437      * @return ResultSet
438      * @throws MusicServiceException
439      * @throws MusicQueryException
440      *//*
441     public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
442                     throws MusicServiceException, MusicQueryException {
443         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
444             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
445             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
446                             + queryObject.getQuery() + "]");
447         }
448         logger.info(EELFLoggerDelegate.applicationLogger,
449                         "Executing Critical get query:" + queryObject.getQuery());
450         PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
451         preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
452         ResultSet results = null;
453         try {
454             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
455         } catch (Exception ex) {
456             logger.error("Exception", ex);
457             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
458             throw new MusicServiceException(ex.getMessage());
459         }
460         return results;
461
462     }
463     */
464     public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException {
465         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
466             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
467             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
468                             + queryObject.getQuery() + "]");
469         }
470         ResultSet results = null;
471         try {
472             SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
473
474             if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) {
475                 if(queryObject.getConsistency() == null) {
476                     statement.setConsistencyLevel(ConsistencyLevel.ONE);
477                 } else {
478                     statement.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
479                 }
480             }
481             else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
482                 statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
483             }
484
485             results = session.execute(statement);
486
487         } catch (Exception ex) {
488             logger.error(EELFLoggerDelegate.errorLogger, "Execute Get Error" + ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject
489                 .getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ex);
490             throw new MusicServiceException("Execute Get Error" + ex.getMessage());
491         }
492         
493         return results;
494         
495     }
496     
497     /**
498      * This method performs DDL operations on Cassandra using consistency level ONE.
499      * 
500      * @param queryObject Object containing cassandra prepared query and values.
501      */
502     public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
503                     throws MusicServiceException, MusicQueryException {
504         return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
505     }
506     
507     /**
508      * 
509      * This method performs DDL operation on Cassandra using consistency level LOCAL_QUORUM.
510      * 
511      * @param queryObject Object containing cassandra prepared query and values.
512      */
513     public ResultSet executeLocalQuorumConsistencyGet(PreparedQueryObject queryObject)
514                     throws MusicServiceException, MusicQueryException {
515         return executeGet(queryObject, CONSISTENCY_LEVEL_LOCAL_QUORUM);
516     }
517     
518     /**
519      * 
520      * This method performs DDL operation on Cassandra using consistency level QUORUM.
521      * 
522      * @param queryObject Object containing cassandra prepared query and values.
523      */
524     public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
525                     throws MusicServiceException, MusicQueryException {
526         return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);
527     }
528     
529 }