Merge "Update junits"
[music.git] / music-core / src / main / java / org / onap / music / datastore / MusicDataStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Modifications Copyright (c) 2018-2019 IBM
8  *  Modifications Copyright (c) 2019 Samsung
9  * ===================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *
14  *     http://www.apache.org/licenses/LICENSE-2.0
15  *
16  *  Unless required by applicable law or agreed to in writing, software
17  *  distributed under the License is distributed on an "AS IS" BASIS,
18  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  *  See the License for the specific language governing permissions and
20  *  limitations under the License.
21  *
22  * ============LICENSE_END=============================================
23  * ====================================================================
24  */
25
26 package org.onap.music.datastore;
27
28 import java.nio.ByteBuffer;
29 import java.util.HashMap;
30 import java.util.Map;
31
32 import org.onap.music.eelf.logging.EELFLoggerDelegate;
33 import org.onap.music.eelf.logging.format.AppMessages;
34 import org.onap.music.eelf.logging.format.ErrorSeverity;
35 import org.onap.music.eelf.logging.format.ErrorTypes;
36 import org.onap.music.exceptions.MusicQueryException;
37 import org.onap.music.exceptions.MusicServiceException;
38 import org.onap.music.lockingservice.cassandra.LockType;
39 import org.onap.music.main.CipherUtil;
40 import org.onap.music.main.MusicUtil;
41 import com.datastax.driver.core.Cluster;
42 import com.datastax.driver.core.ColumnDefinitions;
43 import com.datastax.driver.core.ColumnDefinitions.Definition;
44 import com.datastax.driver.core.ConsistencyLevel;
45 import com.datastax.driver.core.DataType;
46 import com.datastax.driver.core.HostDistance;
47 import com.datastax.driver.core.KeyspaceMetadata;
48 import com.datastax.driver.core.Metadata;
49 import com.datastax.driver.core.PoolingOptions;
50 import com.datastax.driver.core.ResultSet;
51 import com.datastax.driver.core.Row;
52 import com.datastax.driver.core.Session;
53 import com.datastax.driver.core.SimpleStatement;
54 import com.datastax.driver.core.TableMetadata;
55 import com.datastax.driver.core.exceptions.AlreadyExistsException;
56 import com.datastax.driver.core.exceptions.InvalidQueryException;
57 import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
58
59 /**
60  * @author nelson24
61  *
62  */
63 public class MusicDataStore {
64     private Session session;
65     private Cluster cluster;
66
67
68     /**
69      * Connect to default Cassandra address
70      */
71     public MusicDataStore() {
72         try {
73             connectToCassaCluster(MusicUtil.getMyCassaHost());
74         } catch (MusicServiceException e) {
75             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
76         }
77     }
78
79
80     /**
81      * @param cluster
82      * @param session
83      */
84     public MusicDataStore(Cluster cluster, Session session) {
85         this.session = session;
86         setCluster(cluster);
87     }
88
89     
90     /**
91      * @param session
92      */
93     public void setSession(Session session) {
94         this.session = session;
95     }
96
97     /**
98      * @param session
99      */
100     public Session getSession() {
101         return session;
102     }
103
104     /**
105      * @param cluster
106      */
107     public void setCluster(Cluster cluster) {
108         EnumNameCodec<LockType> lockTypeCodec = new EnumNameCodec<LockType>(LockType.class);
109         cluster.getConfiguration().getCodecRegistry().register(lockTypeCodec);
110         
111         this.cluster = cluster;
112     }
113     
114     public Cluster getCluster() {
115         return this.cluster;
116     }
117
118
119     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
120
121     
122     /**
123      *
124      * @param remoteIp
125      * @throws MusicServiceException
126      */
127     public MusicDataStore(String remoteIp) {
128         try {
129             connectToCassaCluster(remoteIp);
130         } catch (MusicServiceException e) {
131             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
132         }
133     }
134
135     /**
136      *
137      */
138     public void close() {
139         session.close();
140     }
141
142     /**
143      * This method connects to cassandra cluster on specific address.
144      *
145      * @param address
146      */
147     private void connectToCassaCluster(String address) throws MusicServiceException {
148         String[] addresses = null;
149         addresses = address.split(",");
150         PoolingOptions poolingOptions = new PoolingOptions();
151         poolingOptions
152         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
153         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
154         
155         Cluster cluster;
156         if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
157             String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd());
158             logger.info(EELFLoggerDelegate.applicationLogger,
159                     "Building with credentials "+MusicUtil.getCassName()+" & "+ MusicUtil.getCassPwd());
160             cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
161                         .withCredentials(MusicUtil.getCassName(), cassPwd)
162                         //.withLoadBalancingPolicy(new RoundRobinPolicy())
163                         .withoutJMXReporting()
164                         .withPoolingOptions(poolingOptions)
165                         .addContactPoints(addresses).build();
166         } else {
167             cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
168                         .withoutJMXReporting()
169                         .withPoolingOptions(poolingOptions)
170                         .addContactPoints(addresses)
171                         .build();
172         }
173         
174         this.setCluster(cluster);
175         Metadata metadata = this.cluster.getMetadata();
176         logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
177                         + metadata.getClusterName() + " at " + address);
178
179         try {
180             session = this.cluster.connect();
181         } catch (Exception ex) {
182             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY,
183                 ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE, ex);
184             throw new MusicServiceException(
185                             "Error while connecting to Cassandra cluster.. " + ex.getMessage());
186         }
187     }
188
189     /**
190      *
191      * @param keyspace
192      * @param tableName
193      * @param columnName
194      * @return DataType
195      */
196     public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
197         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
198         TableMetadata table = ks.getTable(tableName);
199         return table.getColumn(columnName).getType();
200
201     }
202
203     /**
204      *
205      * @param keyspace
206      * @param tableName
207      * @return TableMetadata
208      */
209     public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
210         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
211         return ks.getTable(tableName);
212     }
213     
214     /**
215     *
216     * @param keyspace
217     * @param tableName
218     * @return TableMetadata
219     */
220    public KeyspaceMetadata returnKeyspaceMetadata(String keyspace) {
221        return cluster.getMetadata().getKeyspace(keyspace);
222    }
223
224
225     /**
226      * Utility function to return the Java specific object type.
227      *
228      * @param row
229      * @param colName
230      * @param colType
231      * @return
232      */
233     public Object getColValue(Row row, String colName, DataType colType) {
234
235         switch (colType.getName()) {
236             case VARCHAR:
237                 return row.getString(colName);
238             case UUID:
239                 return row.getUUID(colName);
240             case VARINT:
241                 return row.getVarint(colName);
242             case BIGINT:
243                 return row.getLong(colName);
244             case INT:
245                 return row.getInt(colName);
246             case FLOAT:
247                 return row.getFloat(colName);
248             case DOUBLE:
249                 return row.getDouble(colName);
250             case BOOLEAN:
251                 return row.getBool(colName);
252             case MAP:
253                 return row.getMap(colName, String.class, String.class);
254             case LIST:
255                 return row.getList(colName, String.class);
256             default:
257                 return null;
258         }
259     }
260
261     public byte[] getBlobValue(Row row, String colName, DataType colType) {
262         ByteBuffer bb = row.getBytes(colName);
263         return bb.array();
264     }
265
266     public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
267         ColumnDefinitions colInfo = row.getColumnDefinitions();
268
269         for (Map.Entry<String, Object> entry : condition.entrySet()) {
270             String colName = entry.getKey();
271             DataType colType = colInfo.getType(colName);
272             Object columnValue = getColValue(row, colName, colType);
273             Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
274             if (columnValue.equals(conditionValue) == false)
275                 return false;
276         }
277         return true;
278     }
279
280     /**
281      * Utility function to store ResultSet values in to a MAP for output.
282      *
283      * @param results
284      * @return MAP
285      */
286     public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
287         Map<String, HashMap<String, Object>> resultMap =
288                         new HashMap<>();
289         int counter = 0;
290         for (Row row : results) {
291             ColumnDefinitions colInfo = row.getColumnDefinitions();
292             HashMap<String, Object> resultOutput = new HashMap<>();
293             for (Definition definition : colInfo) {
294                 if (!(("vector_ts").equals(definition.getName()))) {
295                     if(definition.getType().toString().toLowerCase().contains("blob")) {
296                         resultOutput.put(definition.getName(),
297                                 getBlobValue(row, definition.getName(), definition.getType()));
298                     } else {
299                         resultOutput.put(definition.getName(),
300                                     getColValue(row, definition.getName(), definition.getType()));
301                     }
302                 }
303             }
304             resultMap.put("row " + counter, resultOutput);
305             counter++;
306         }
307         return resultMap;
308     }
309
310
311     // Prepared Statements 1802 additions
312     
313     public boolean executePut(PreparedQueryObject queryObject, String consistency)
314             throws MusicServiceException, MusicQueryException {
315         return executePut(queryObject, consistency, 0);
316     }
317     /**
318      * This Method performs DDL and DML operations on Cassandra using specified consistency level
319      *
320      * @param queryObject Object containing cassandra prepared query and values.
321      * @param consistency Specify consistency level for data synchronization across cassandra
322      *        replicas
323      * @return Boolean Indicates operation success or failure
324      * @throws MusicServiceException
325      * @throws MusicQueryException
326      */
327     public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot)
328                     throws MusicServiceException, MusicQueryException {
329
330         boolean result = false;
331         long timeOfWrite = System.currentTimeMillis();
332         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
333             logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
334             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
335                             + queryObject.getQuery() + "]");
336         }
337         logger.debug(EELFLoggerDelegate.applicationLogger,
338                         "In preprared Execute Put: the actual insert query:"
339                                         + queryObject.getQuery() + "; the values"
340                                         + queryObject.getValues());
341         SimpleStatement preparedInsert = null;
342
343         try {
344             preparedInsert = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
345             if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
346                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
347                 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
348             } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
349                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
350                 if(queryObject.getConsistency() == null)
351                     preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
352                 else
353                     preparedInsert.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
354             } else if (consistency.equalsIgnoreCase(MusicUtil.ONE)) {
355                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
356             }  else if (consistency.equalsIgnoreCase(MusicUtil.QUORUM)) {
357                 preparedInsert.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
358             } else if (consistency.equalsIgnoreCase(MusicUtil.ALL)) {
359                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ALL);
360             }
361             long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite);
362             preparedInsert.setDefaultTimestamp(timestamp);
363
364             ResultSet rs = session.execute(preparedInsert);
365             result = rs.wasApplied();
366
367         }
368         catch (AlreadyExistsException ae) {
369             // logger.error(EELFLoggerDelegate.errorLogger,"AlreadExistsException: " + ae.getMessage(),AppMessages.QUERYERROR,
370             // ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
371             throw new MusicQueryException("AlreadyExistsException: " + ae.getMessage(),ae);
372         } catch ( InvalidQueryException e ) {
373             // logger.error(EELFLoggerDelegate.errorLogger,"InvalidQueryException: " + e.getMessage(),AppMessages.SESSIONFAILED + " [" 
374             // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
375             throw new MusicQueryException("InvalidQueryException: " + e.getMessage(),e);
376         } catch (Exception e) {
377             // logger.error(EELFLoggerDelegate.errorLogger,e.getClass().toString() + ":" + e.getMessage(),AppMessages.SESSIONFAILED + " [" 
378             //     + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e);
379             throw new MusicServiceException("Executing Session Failure for Request = " + "["
380                 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage(),e);
381         }
382         return result;
383     }
384
385  /*   *//**
386      * This method performs DDL operations on Cassandra using consistency level ONE.
387      *
388      * @param queryObject Object containing cassandra prepared query and values.
389      * @return ResultSet
390      * @throws MusicServiceException
391      * @throws MusicQueryException
392      *//*
393     public ResultSet executeEventualGet(PreparedQueryObject queryObject)
394                     throws MusicServiceException, MusicQueryException {
395         CacheAccess<String, PreparedStatement> queryBank = CachingUtil.getStatementBank();
396         PreparedStatement preparedEventualGet = null;
397         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
398             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
399             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
400                             + queryObject.getQuery() + "]");
401         }
402         logger.info(EELFLoggerDelegate.applicationLogger,
403                         "Executing Eventual  get query:" + queryObject.getQuery());
404
405         ResultSet results = null;
406         try {
407             if(queryBank.get(queryObject.getQuery()) != null )
408                 preparedEventualGet=queryBank.get(queryObject.getQuery());
409             else {
410                 preparedEventualGet = session.prepare(queryObject.getQuery());
411                 CachingUtil.updateStatementBank(queryObject.getQuery(), preparedEventualGet);
412             }
413             if(queryObject.getConsistency() == null) {
414                 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
415             } else {
416                 preparedEventualGet.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
417             }
418             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
419
420         } catch (Exception ex) {
421             logger.error("Exception", ex);
422             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
423             throw new MusicServiceException(ex.getMessage());
424         }
425         return results;
426     }
427
428     *//**
429      *
430      * This method performs DDL operation on Cassandra using consistency level QUORUM.
431      *
432      * @param queryObject Object containing cassandra prepared query and values.
433      * @return ResultSet
434      * @throws MusicServiceException
435      * @throws MusicQueryException
436      *//*
437     public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
438                     throws MusicServiceException, MusicQueryException {
439         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
440             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
441             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
442                             + queryObject.getQuery() + "]");
443         }
444         logger.info(EELFLoggerDelegate.applicationLogger,
445                         "Executing Critical get query:" + queryObject.getQuery());
446         PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
447         preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
448         ResultSet results = null;
449         try {
450             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
451         } catch (Exception ex) {
452             logger.error("Exception", ex);
453             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
454             throw new MusicServiceException(ex.getMessage());
455         }
456         return results;
457
458     }
459     */
460     public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException {
461         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
462             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
463             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
464                             + queryObject.getQuery() + "]");
465         }
466         ResultSet results = null;
467         try {
468             SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
469
470             if (consistencyLevel.equalsIgnoreCase(MusicUtil.ONE)) {
471                 if(queryObject.getConsistency() == null) {
472                     statement.setConsistencyLevel(ConsistencyLevel.ONE);
473                 } else {
474                     statement.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
475                 }
476             } else if (consistencyLevel.equalsIgnoreCase(MusicUtil.QUORUM)) {
477                 statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
478             } else if (consistencyLevel.equalsIgnoreCase(MusicUtil.LOCAL_QUORUM)) {
479                 statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
480             }
481
482             results = session.execute(statement);
483
484         } catch (Exception ex) {
485             logger.error(EELFLoggerDelegate.errorLogger, "Execute Get Error" + ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject
486                 .getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ex);
487             throw new MusicServiceException("Execute Get Error" + ex.getMessage());
488         }
489         
490         return results;
491         
492     }
493     
494     /**
495      * This method performs DDL operations on Cassandra using consistency level ONE.
496      * 
497      * @param queryObject Object containing cassandra prepared query and values.
498      */
499     public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
500                     throws MusicServiceException, MusicQueryException {
501         return executeGet(queryObject, MusicUtil.ONE);
502     }
503     
504     /**
505      * 
506      * This method performs DDL operation on Cassandra using consistency level LOCAL_QUORUM.
507      * 
508      * @param queryObject Object containing cassandra prepared query and values.
509      */
510     public ResultSet executeLocalQuorumConsistencyGet(PreparedQueryObject queryObject)
511                     throws MusicServiceException, MusicQueryException {
512         return executeGet(queryObject, MusicUtil.LOCAL_QUORUM);
513     }
514     
515     /**
516      * 
517      * This method performs DDL operation on Cassandra using consistency level QUORUM.
518      * 
519      * @param queryObject Object containing cassandra prepared query and values.
520      */
521     public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
522                     throws MusicServiceException, MusicQueryException {
523         return executeGet(queryObject, MusicUtil.QUORUM);
524     }
525     
526 }