Added properties for cassandra connect and read time outs with Junit cases.
[music.git] / music-core / src / main / java / org / onap / music / datastore / MusicDataStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Modifications Copyright (c) 2018-2019 IBM
8  *  Modifications Copyright (c) 2019 Samsung
9  * ===================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *
14  *     http://www.apache.org/licenses/LICENSE-2.0
15  *
16  *  Unless required by applicable law or agreed to in writing, software
17  *  distributed under the License is distributed on an "AS IS" BASIS,
18  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  *  See the License for the specific language governing permissions and
20  *  limitations under the License.
21  *
22  * ============LICENSE_END=============================================
23  * ====================================================================
24  */
25
26 package org.onap.music.datastore;
27
28 import java.nio.ByteBuffer;
29 import java.util.HashMap;
30 import java.util.Map;
31
32 import org.onap.music.eelf.logging.EELFLoggerDelegate;
33 import org.onap.music.eelf.logging.format.AppMessages;
34 import org.onap.music.eelf.logging.format.ErrorSeverity;
35 import org.onap.music.eelf.logging.format.ErrorTypes;
36 import org.onap.music.exceptions.MusicQueryException;
37 import org.onap.music.exceptions.MusicServiceException;
38 import org.onap.music.lockingservice.cassandra.LockType;
39 import org.onap.music.main.CipherUtil;
40 import org.onap.music.main.MusicUtil;
41 import com.datastax.driver.core.Cluster;
42 import com.datastax.driver.core.ColumnDefinitions;
43 import com.datastax.driver.core.ColumnDefinitions.Definition;
44 import com.datastax.driver.core.ConsistencyLevel;
45 import com.datastax.driver.core.DataType;
46 import com.datastax.driver.core.HostDistance;
47 import com.datastax.driver.core.KeyspaceMetadata;
48 import com.datastax.driver.core.Metadata;
49 import com.datastax.driver.core.PoolingOptions;
50 import com.datastax.driver.core.ResultSet;
51 import com.datastax.driver.core.Row;
52 import com.datastax.driver.core.Session;
53 import com.datastax.driver.core.SimpleStatement;
54 import com.datastax.driver.core.SocketOptions;
55 import com.datastax.driver.core.TableMetadata;
56 import com.datastax.driver.core.exceptions.AlreadyExistsException;
57 import com.datastax.driver.core.exceptions.InvalidQueryException;
58 import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
59
60 /**
61  * @author nelson24
62  *
63  */
64 public class MusicDataStore {
65
66     public static final String CONSISTENCY_LEVEL_ONE = "ONE";
67     public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
68     public static final String CONSISTENCY_LEVEL_LOCAL_QUORUM = "LOCAL_QUORUM";
69     private Session session;
70     private Cluster cluster;
71
72
73     /**
74      * Connect to default Cassandra address
75      */
76     public MusicDataStore() {
77         try {
78             connectToCassaCluster(MusicUtil.getMyCassaHost());
79         } catch (MusicServiceException e) {
80             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
81         }
82     }
83
84
85     /**
86      * @param cluster
87      * @param session
88      */
89     public MusicDataStore(Cluster cluster, Session session) {
90         this.session = session;
91         setCluster(cluster);
92     }
93
94
95     /**
96      * @param session
97      */
98     public void setSession(Session session) {
99         this.session = session;
100     }
101
102     /**
103      * @param session
104      */
105     public Session getSession() {
106         return session;
107     }
108
109     /**
110      * @param cluster
111      */
112     public void setCluster(Cluster cluster) {
113         EnumNameCodec<LockType> lockTypeCodec = new EnumNameCodec<LockType>(LockType.class);
114         cluster.getConfiguration().getCodecRegistry().register(lockTypeCodec);
115
116         this.cluster = cluster;
117     }
118
119     public Cluster getCluster() {
120         return this.cluster;
121     }
122
123
124     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
125
126
127     /**
128      *
129      * @param remoteIp
130      * @throws MusicServiceException
131      */
132     public MusicDataStore(String remoteIp) {
133         try {
134             connectToCassaCluster(remoteIp);
135         } catch (MusicServiceException e) {
136             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
137         }
138     }
139
140     /**
141      *
142      */
143     public void close() {
144         session.close();
145     }
146
147     /**
148      * This method connects to cassandra cluster on specific address.
149      *
150      * @param address
151      */
152     private void connectToCassaCluster(String address) throws MusicServiceException {
153         String[] addresses = null;
154         addresses = address.split(",");
155         PoolingOptions poolingOptions = new PoolingOptions();
156         poolingOptions
157         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
158         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
159
160         Cluster cluster;
161         if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
162             String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd());
163             logger.info(EELFLoggerDelegate.applicationLogger,
164                     "Building with credentials "+MusicUtil.getCassName()+" & "+ MusicUtil.getCassPwd());
165             cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
166                     .withCredentials(MusicUtil.getCassName(), cassPwd)
167                     //.withLoadBalancingPolicy(new RoundRobinPolicy())
168                     .withoutJMXReporting()
169                     .withPoolingOptions(poolingOptions)
170                     .withSocketOptions(
171                             new SocketOptions().setConnectTimeoutMillis(MusicUtil.getCassandraConnectTimeOutMS())
172                             .setReadTimeoutMillis(MusicUtil.getCassandraReadTimeOutMS()))
173                     .addContactPoints(addresses).build();
174         } else {
175             cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
176                     .withoutJMXReporting()
177                     .withPoolingOptions(poolingOptions)
178                     .withSocketOptions(new SocketOptions()
179                             .setConnectTimeoutMillis(MusicUtil.getCassandraConnectTimeOutMS())
180                             .setReadTimeoutMillis(MusicUtil.getCassandraReadTimeOutMS()))
181                     .addContactPoints(addresses)
182                     .build();
183         }
184
185         this.setCluster(cluster);
186         Metadata metadata = this.cluster.getMetadata();
187         logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
188                 + metadata.getClusterName() + " at " + address);
189
190         try {
191             session = this.cluster.connect();
192         } catch (Exception ex) {
193             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY,
194                     ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE, ex);
195             throw new MusicServiceException(
196                     "Error while connecting to Cassandra cluster.. " + ex.getMessage());
197         }
198     }
199
200     /**
201      *
202      * @param keyspace
203      * @param tableName
204      * @param columnName
205      * @return DataType
206      */
207     public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
208         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
209         TableMetadata table = ks.getTable(tableName);
210         return table.getColumn(columnName).getType();
211
212     }
213
214     /**
215      *
216      * @param keyspace
217      * @param tableName
218      * @return TableMetadata
219      */
220     public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
221         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
222         return ks.getTable(tableName);
223     }
224
225     /**
226      *
227      * @param keyspace
228      * @param tableName
229      * @return TableMetadata
230      */
231     public KeyspaceMetadata returnKeyspaceMetadata(String keyspace) {
232         return cluster.getMetadata().getKeyspace(keyspace);
233     }
234
235
236     /**
237      * Utility function to return the Java specific object type.
238      *
239      * @param row
240      * @param colName
241      * @param colType
242      * @return
243      */
244     public Object getColValue(Row row, String colName, DataType colType) {
245
246         switch (colType.getName()) {
247             case VARCHAR:
248                 return row.getString(colName);
249             case UUID:
250                 return row.getUUID(colName);
251             case VARINT:
252                 return row.getVarint(colName);
253             case BIGINT:
254                 return row.getLong(colName);
255             case INT:
256                 return row.getInt(colName);
257             case FLOAT:
258                 return row.getFloat(colName);
259             case DOUBLE:
260                 return row.getDouble(colName);
261             case BOOLEAN:
262                 return row.getBool(colName);
263             case MAP:
264                 return row.getMap(colName, String.class, String.class);
265             case LIST:
266                 return row.getList(colName, String.class);
267             default:
268                 return null;
269         }
270     }
271
272     public byte[] getBlobValue(Row row, String colName, DataType colType) {
273         ByteBuffer bb = row.getBytes(colName);
274         return bb.array();
275     }
276
277     public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
278         ColumnDefinitions colInfo = row.getColumnDefinitions();
279
280         for (Map.Entry<String, Object> entry : condition.entrySet()) {
281             String colName = entry.getKey();
282             DataType colType = colInfo.getType(colName);
283             Object columnValue = getColValue(row, colName, colType);
284             Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
285             if (columnValue.equals(conditionValue) == false)
286                 return false;
287         }
288         return true;
289     }
290
291     /**
292      * Utility function to store ResultSet values in to a MAP for output.
293      *
294      * @param results
295      * @return MAP
296      */
297     public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
298         Map<String, HashMap<String, Object>> resultMap =
299                 new HashMap<>();
300         int counter = 0;
301         for (Row row : results) {
302             ColumnDefinitions colInfo = row.getColumnDefinitions();
303             HashMap<String, Object> resultOutput = new HashMap<>();
304             for (Definition definition : colInfo) {
305                 if (!(("vector_ts").equals(definition.getName()))) {
306                     if(definition.getType().toString().toLowerCase().contains("blob")) {
307                         resultOutput.put(definition.getName(),
308                                 getBlobValue(row, definition.getName(), definition.getType()));
309                     } else {
310                         resultOutput.put(definition.getName(),
311                                 getColValue(row, definition.getName(), definition.getType()));
312                     }
313                 }
314             }
315             resultMap.put("row " + counter, resultOutput);
316             counter++;
317         }
318         return resultMap;
319     }
320
321
322     // Prepared Statements 1802 additions
323
324     public boolean executePut(PreparedQueryObject queryObject, String consistency)
325             throws MusicServiceException, MusicQueryException {
326         return executePut(queryObject, consistency, 0);
327     }
328     /**
329      * This Method performs DDL and DML operations on Cassandra using specified consistency level
330      *
331      * @param queryObject Object containing cassandra prepared query and values.
332      * @param consistency Specify consistency level for data synchronization across cassandra
333      *        replicas
334      * @return Boolean Indicates operation success or failure
335      * @throws MusicServiceException
336      * @throws MusicQueryException
337      */
338     public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot)
339             throws MusicServiceException, MusicQueryException {
340
341         boolean result = false;
342         long timeOfWrite = System.currentTimeMillis();
343         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
344             logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
345             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
346                     + queryObject.getQuery() + "]");
347         }
348         logger.debug(EELFLoggerDelegate.applicationLogger,
349                 "In preprared Execute Put: the actual insert query:"
350                         + queryObject.getQuery() + "; the values"
351                         + queryObject.getValues());
352         SimpleStatement preparedInsert = null;
353
354         try {
355             preparedInsert = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
356             if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
357                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
358                 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
359             } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
360                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
361                 if(queryObject.getConsistency() == null)
362                     preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
363                 else
364                     preparedInsert.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
365             } else if (consistency.equalsIgnoreCase(MusicUtil.ONE)) {
366                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
367             }  else if (consistency.equalsIgnoreCase(MusicUtil.QUORUM)) {
368                 preparedInsert.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
369             } else if (consistency.equalsIgnoreCase(MusicUtil.ALL)) {
370                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ALL);
371             }
372             long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite);
373             preparedInsert.setDefaultTimestamp(timestamp);
374
375             ResultSet rs = session.execute(preparedInsert);
376             result = rs.wasApplied();
377
378         }
379         catch (AlreadyExistsException ae) {
380             // logger.error(EELFLoggerDelegate.errorLogger,"AlreadExistsException: " + ae.getMessage(),AppMessages.QUERYERROR,
381             // ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
382             throw new MusicQueryException("AlreadyExistsException: " + ae.getMessage(),ae);
383         } catch ( InvalidQueryException e ) {
384             // logger.error(EELFLoggerDelegate.errorLogger,"InvalidQueryException: " + e.getMessage(),AppMessages.SESSIONFAILED + " [" 
385             // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
386             throw new MusicQueryException("InvalidQueryException: " + e.getMessage(),e);
387         } catch (Exception e) {
388             // logger.error(EELFLoggerDelegate.errorLogger,e.getClass().toString() + ":" + e.getMessage(),AppMessages.SESSIONFAILED + " [" 
389             //     + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e);
390             throw new MusicServiceException("Executing Session Failure for Request = " + "["
391                     + queryObject.getQuery() + "]" + " Reason = " + e.getMessage(),e);
392         }
393         return result;
394     }
395
396     /*   *//**
397      * This method performs DDL operations on Cassandra using consistency level ONE.
398      *
399      * @param queryObject Object containing cassandra prepared query and values.
400      * @return ResultSet
401      * @throws MusicServiceException
402      * @throws MusicQueryException
403      *//*
404     public ResultSet executeEventualGet(PreparedQueryObject queryObject)
405                     throws MusicServiceException, MusicQueryException {
406         CacheAccess<String, PreparedStatement> queryBank = CachingUtil.getStatementBank();
407         PreparedStatement preparedEventualGet = null;
408         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
409             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
410             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
411                             + queryObject.getQuery() + "]");
412         }
413         logger.info(EELFLoggerDelegate.applicationLogger,
414                         "Executing Eventual  get query:" + queryObject.getQuery());
415
416         ResultSet results = null;
417         try {
418             if(queryBank.get(queryObject.getQuery()) != null )
419                 preparedEventualGet=queryBank.get(queryObject.getQuery());
420             else {
421                 preparedEventualGet = session.prepare(queryObject.getQuery());
422                 CachingUtil.updateStatementBank(queryObject.getQuery(), preparedEventualGet);
423             }
424             if(queryObject.getConsistency() == null) {
425                 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
426             } else {
427                 preparedEventualGet.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
428             }
429             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
430
431         } catch (Exception ex) {
432             logger.error("Exception", ex);
433             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
434             throw new MusicServiceException(ex.getMessage());
435         }
436         return results;
437     }
438
439       *//**
440       *
441       * This method performs DDL operation on Cassandra using consistency level QUORUM.
442       *
443       * @param queryObject Object containing cassandra prepared query and values.
444       * @return ResultSet
445       * @throws MusicServiceException
446       * @throws MusicQueryException
447       *//*
448     public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
449                     throws MusicServiceException, MusicQueryException {
450         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
451             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
452             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
453                             + queryObject.getQuery() + "]");
454         }
455         logger.info(EELFLoggerDelegate.applicationLogger,
456                         "Executing Critical get query:" + queryObject.getQuery());
457         PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
458         preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
459         ResultSet results = null;
460         try {
461             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
462         } catch (Exception ex) {
463             logger.error("Exception", ex);
464             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
465             throw new MusicServiceException(ex.getMessage());
466         }
467         return results;
468
469     }
470        */
471     public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException {
472         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
473             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
474             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
475                     + queryObject.getQuery() + "]");
476         }
477         ResultSet results = null;
478         try {
479             SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
480             if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) {
481                     statement.setConsistencyLevel(ConsistencyLevel.ONE);
482             } else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
483                 statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
484             } else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_LOCAL_QUORUM)) {
485                 statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
486             }
487
488             results = session.execute(statement);
489
490         } catch (Exception ex) {
491             logger.error(EELFLoggerDelegate.errorLogger, "Execute Get Error" + ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject
492                     .getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ex);
493             throw new MusicServiceException("Execute Get Error" + ex.getMessage());
494         }
495
496         return results;
497
498     }
499
500     /**
501      * This method performs DDL operations on Cassandra using consistency level ONE.
502      * 
503      * @param queryObject Object containing cassandra prepared query and values.
504      */
505     public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
506             throws MusicServiceException, MusicQueryException {
507         return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
508     }
509
510     /**
511      * 
512      * This method performs DDL operation on Cassandra using consistency level LOCAL_QUORUM.
513      * 
514      * @param queryObject Object containing cassandra prepared query and values.
515      */
516     public ResultSet executeLocalQuorumConsistencyGet(PreparedQueryObject queryObject)
517             throws MusicServiceException, MusicQueryException {
518         return executeGet(queryObject, CONSISTENCY_LEVEL_LOCAL_QUORUM);
519     }
520
521     /**
522      * 
523      * This method performs DDL operation on Cassandra using consistency level QUORUM.
524      * 
525      * @param queryObject Object containing cassandra prepared query and values.
526      */
527     public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
528             throws MusicServiceException, MusicQueryException {
529         return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);
530     }
531
532 }