97fc1d33782d980a231535def16ccbae7ec0f60c
[music.git] / music-core / src / main / java / org / onap / music / datastore / MusicDataStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Modifications Copyright (c) 2018-2019 IBM
8  *  Modifications Copyright (c) 2019 Samsung
9  * ===================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *
14  *     http://www.apache.org/licenses/LICENSE-2.0
15  *
16  *  Unless required by applicable law or agreed to in writing, software
17  *  distributed under the License is distributed on an "AS IS" BASIS,
18  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  *  See the License for the specific language governing permissions and
20  *  limitations under the License.
21  *
22  * ============LICENSE_END=============================================
23  * ====================================================================
24  */
25
26 package org.onap.music.datastore;
27
28 import java.net.InetAddress;
29 import java.net.NetworkInterface;
30 import java.net.SocketException;
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.Enumeration;
34 import java.util.HashMap;
35 import java.util.Iterator;
36 import java.util.Map;
37
38 import org.onap.music.eelf.logging.EELFLoggerDelegate;
39 import org.onap.music.eelf.logging.format.AppMessages;
40 import org.onap.music.eelf.logging.format.ErrorSeverity;
41 import org.onap.music.eelf.logging.format.ErrorTypes;
42 import org.onap.music.exceptions.MusicQueryException;
43 import org.onap.music.exceptions.MusicServiceException;
44 import org.onap.music.lockingservice.cassandra.LockType;
45 import org.onap.music.main.CipherUtil;
46 import org.onap.music.main.MusicUtil;
47 import com.datastax.driver.core.Cluster;
48 import com.datastax.driver.core.ColumnDefinitions;
49 import com.datastax.driver.core.ColumnDefinitions.Definition;
50 import com.datastax.driver.core.ConsistencyLevel;
51 import com.datastax.driver.core.DataType;
52 import com.datastax.driver.core.HostDistance;
53 import com.datastax.driver.core.KeyspaceMetadata;
54 import com.datastax.driver.core.Metadata;
55 import com.datastax.driver.core.PoolingOptions;
56 import com.datastax.driver.core.ResultSet;
57 import com.datastax.driver.core.Row;
58 import com.datastax.driver.core.Session;
59 import com.datastax.driver.core.SimpleStatement;
60 import com.datastax.driver.core.TableMetadata;
61 import com.datastax.driver.core.TypeCodec;
62 import com.datastax.driver.core.exceptions.AlreadyExistsException;
63 import com.datastax.driver.core.exceptions.InvalidQueryException;
64 import com.datastax.driver.core.exceptions.NoHostAvailableException;
65 import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
66 import com.datastax.driver.extras.codecs.enums.EnumOrdinalCodec;
67
68 /**
69  * @author nelson24
70  *
71  */
72 public class MusicDataStore {
73
74     public static final String CONSISTENCY_LEVEL_ONE = "ONE";
75     public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
76     private Session session;
77     private Cluster cluster;
78
79
80     /**
81      * Connect to default Cassandra address
82      */
83     public MusicDataStore() {
84         try {
85             connectToCassaCluster(MusicUtil.getMyCassaHost());
86         } catch (MusicServiceException e) {
87             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
88         }
89     }
90
91
92     /**
93      * @param cluster
94      * @param session
95      */
96     public MusicDataStore(Cluster cluster, Session session) {
97         this.session = session;
98         setCluster(cluster);
99     }
100
101     
102     /**
103      * @param session
104      */
105     public void setSession(Session session) {
106         this.session = session;
107     }
108
109     /**
110      * @param session
111      */
112     public Session getSession() {
113         return session;
114     }
115
116     /**
117      * @param cluster
118      */
119     public void setCluster(Cluster cluster) {
120         EnumNameCodec<LockType> lockTypeCodec = new EnumNameCodec<LockType>(LockType.class);
121         cluster.getConfiguration().getCodecRegistry().register(lockTypeCodec);
122         
123         this.cluster = cluster;
124     }
125     
126     public Cluster getCluster() {
127         return this.cluster;
128     }
129
130
131     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
132
133     
134     /**
135      *
136      * @param remoteIp
137      * @throws MusicServiceException
138      */
139     public MusicDataStore(String remoteIp) {
140         try {
141             connectToCassaCluster(remoteIp);
142         } catch (MusicServiceException e) {
143             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
144         }
145     }
146
147     /**
148      *
149      */
150     public void close() {
151         session.close();
152     }
153
154     /**
155      * This method connects to cassandra cluster on specific address.
156      *
157      * @param address
158      */
159     private void connectToCassaCluster(String address) throws MusicServiceException {
160         String[] addresses = null;
161         addresses = address.split(",");
162         PoolingOptions poolingOptions = new PoolingOptions();
163         poolingOptions
164         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
165         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
166         
167         Cluster cluster;
168         if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
169             String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd());
170             logger.info(EELFLoggerDelegate.applicationLogger,
171                     "Building with credentials "+MusicUtil.getCassName()+" & "+ MusicUtil.getCassPwd());
172             cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
173                         .withCredentials(MusicUtil.getCassName(), cassPwd)
174                         //.withLoadBalancingPolicy(new RoundRobinPolicy())
175                         .withoutJMXReporting()
176                         .withPoolingOptions(poolingOptions)
177                         .addContactPoints(addresses).build();
178         } else {
179             cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
180                         .withoutJMXReporting()
181                         .withPoolingOptions(poolingOptions)
182                         .addContactPoints(addresses)
183                         .build();
184         }
185         
186         this.setCluster(cluster);
187         Metadata metadata = this.cluster.getMetadata();
188         logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
189                         + metadata.getClusterName() + " at " + address);
190
191         try {
192             session = this.cluster.connect();
193         } catch (Exception ex) {
194             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY,
195                 ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE, ex);
196             throw new MusicServiceException(
197                             "Error while connecting to Cassandra cluster.. " + ex.getMessage());
198         }
199     }
200
201     /**
202      *
203      * @param keyspace
204      * @param tableName
205      * @param columnName
206      * @return DataType
207      */
208     public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
209         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
210         TableMetadata table = ks.getTable(tableName);
211         return table.getColumn(columnName).getType();
212
213     }
214
215     /**
216      *
217      * @param keyspace
218      * @param tableName
219      * @return TableMetadata
220      */
221     public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
222         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
223         return ks.getTable(tableName);
224     }
225     
226     /**
227     *
228     * @param keyspace
229     * @param tableName
230     * @return TableMetadata
231     */
232    public KeyspaceMetadata returnKeyspaceMetadata(String keyspace) {
233        return cluster.getMetadata().getKeyspace(keyspace);
234    }
235
236
237     /**
238      * Utility function to return the Java specific object type.
239      *
240      * @param row
241      * @param colName
242      * @param colType
243      * @return
244      */
245     public Object getColValue(Row row, String colName, DataType colType) {
246
247         switch (colType.getName()) {
248             case VARCHAR:
249                 return row.getString(colName);
250             case UUID:
251                 return row.getUUID(colName);
252             case VARINT:
253                 return row.getVarint(colName);
254             case BIGINT:
255                 return row.getLong(colName);
256             case INT:
257                 return row.getInt(colName);
258             case FLOAT:
259                 return row.getFloat(colName);
260             case DOUBLE:
261                 return row.getDouble(colName);
262             case BOOLEAN:
263                 return row.getBool(colName);
264             case MAP:
265                 return row.getMap(colName, String.class, String.class);
266             case LIST:
267                 return row.getList(colName, String.class);
268             default:
269                 return null;
270         }
271     }
272
273     public byte[] getBlobValue(Row row, String colName, DataType colType) {
274         ByteBuffer bb = row.getBytes(colName);
275         return bb.array();
276     }
277
278     public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
279         ColumnDefinitions colInfo = row.getColumnDefinitions();
280
281         for (Map.Entry<String, Object> entry : condition.entrySet()) {
282             String colName = entry.getKey();
283             DataType colType = colInfo.getType(colName);
284             Object columnValue = getColValue(row, colName, colType);
285             Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
286             if (columnValue.equals(conditionValue) == false)
287                 return false;
288         }
289         return true;
290     }
291
292     /**
293      * Utility function to store ResultSet values in to a MAP for output.
294      *
295      * @param results
296      * @return MAP
297      */
298     public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
299         Map<String, HashMap<String, Object>> resultMap =
300                         new HashMap<>();
301         int counter = 0;
302         for (Row row : results) {
303             ColumnDefinitions colInfo = row.getColumnDefinitions();
304             HashMap<String, Object> resultOutput = new HashMap<>();
305             for (Definition definition : colInfo) {
306                 if (!(("vector_ts").equals(definition.getName()))) {
307                     if(definition.getType().toString().toLowerCase().contains("blob")) {
308                         resultOutput.put(definition.getName(),
309                                 getBlobValue(row, definition.getName(), definition.getType()));
310                     } else {
311                         resultOutput.put(definition.getName(),
312                                     getColValue(row, definition.getName(), definition.getType()));
313                     }
314                 }
315             }
316             resultMap.put("row " + counter, resultOutput);
317             counter++;
318         }
319         return resultMap;
320     }
321
322
323     // Prepared Statements 1802 additions
324     
325     public boolean executePut(PreparedQueryObject queryObject, String consistency)
326             throws MusicServiceException, MusicQueryException {
327         return executePut(queryObject, consistency, 0);
328     }
329     /**
330      * This Method performs DDL and DML operations on Cassandra using specified consistency level
331      *
332      * @param queryObject Object containing cassandra prepared query and values.
333      * @param consistency Specify consistency level for data synchronization across cassandra
334      *        replicas
335      * @return Boolean Indicates operation success or failure
336      * @throws MusicServiceException
337      * @throws MusicQueryException
338      */
339     public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot)
340                     throws MusicServiceException, MusicQueryException {
341
342         boolean result = false;
343         long timeOfWrite = System.currentTimeMillis();
344         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
345             logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
346             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
347                             + queryObject.getQuery() + "]");
348         }
349         logger.debug(EELFLoggerDelegate.applicationLogger,
350                         "In preprared Execute Put: the actual insert query:"
351                                         + queryObject.getQuery() + "; the values"
352                                         + queryObject.getValues());
353         SimpleStatement preparedInsert = null;
354
355         try {
356             preparedInsert = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
357             if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
358                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
359                 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
360             } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
361                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
362                 if(queryObject.getConsistency() == null)
363                     preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
364                 else
365                     preparedInsert.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
366             } else if (consistency.equalsIgnoreCase(MusicUtil.ONE)) {
367                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
368             }  else if (consistency.equalsIgnoreCase(MusicUtil.QUORUM)) {
369                 preparedInsert.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
370             } else if (consistency.equalsIgnoreCase(MusicUtil.ALL)) {
371                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ALL);
372             }
373             long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite);
374             preparedInsert.setDefaultTimestamp(timestamp);
375
376             ResultSet rs = session.execute(preparedInsert);
377             result = rs.wasApplied();
378
379         }
380         catch (AlreadyExistsException ae) {
381             // logger.error(EELFLoggerDelegate.errorLogger,"AlreadExistsException: " + ae.getMessage(),AppMessages.QUERYERROR,
382             // ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
383             throw new MusicQueryException("AlreadyExistsException: " + ae.getMessage(),ae);
384         } catch ( InvalidQueryException e ) {
385             // logger.error(EELFLoggerDelegate.errorLogger,"InvalidQueryException: " + e.getMessage(),AppMessages.SESSIONFAILED + " [" 
386             // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
387             throw new MusicQueryException("InvalidQueryException: " + e.getMessage(),e);
388         } catch (Exception e) {
389             // logger.error(EELFLoggerDelegate.errorLogger,e.getClass().toString() + ":" + e.getMessage(),AppMessages.SESSIONFAILED + " [" 
390             //     + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e);
391             throw new MusicServiceException("Executing Session Failure for Request = " + "["
392                 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage(),e);
393         }
394         return result;
395     }
396
397  /*   *//**
398      * This method performs DDL operations on Cassandra using consistency level ONE.
399      *
400      * @param queryObject Object containing cassandra prepared query and values.
401      * @return ResultSet
402      * @throws MusicServiceException
403      * @throws MusicQueryException
404      *//*
405     public ResultSet executeEventualGet(PreparedQueryObject queryObject)
406                     throws MusicServiceException, MusicQueryException {
407         CacheAccess<String, PreparedStatement> queryBank = CachingUtil.getStatementBank();
408         PreparedStatement preparedEventualGet = null;
409         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
410             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
411             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
412                             + queryObject.getQuery() + "]");
413         }
414         logger.info(EELFLoggerDelegate.applicationLogger,
415                         "Executing Eventual  get query:" + queryObject.getQuery());
416
417         ResultSet results = null;
418         try {
419             if(queryBank.get(queryObject.getQuery()) != null )
420                 preparedEventualGet=queryBank.get(queryObject.getQuery());
421             else {
422                 preparedEventualGet = session.prepare(queryObject.getQuery());
423                 CachingUtil.updateStatementBank(queryObject.getQuery(), preparedEventualGet);
424             }
425             if(queryObject.getConsistency() == null) {
426                 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
427             } else {
428                 preparedEventualGet.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
429             }
430             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
431
432         } catch (Exception ex) {
433             logger.error("Exception", ex);
434             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
435             throw new MusicServiceException(ex.getMessage());
436         }
437         return results;
438     }
439
440     *//**
441      *
442      * This method performs DDL operation on Cassandra using consistency level QUORUM.
443      *
444      * @param queryObject Object containing cassandra prepared query and values.
445      * @return ResultSet
446      * @throws MusicServiceException
447      * @throws MusicQueryException
448      *//*
449     public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
450                     throws MusicServiceException, MusicQueryException {
451         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
452             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
453             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
454                             + queryObject.getQuery() + "]");
455         }
456         logger.info(EELFLoggerDelegate.applicationLogger,
457                         "Executing Critical get query:" + queryObject.getQuery());
458         PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
459         preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
460         ResultSet results = null;
461         try {
462             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
463         } catch (Exception ex) {
464             logger.error("Exception", ex);
465             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
466             throw new MusicServiceException(ex.getMessage());
467         }
468         return results;
469
470     }
471     */
472     public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException {
473         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
474             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
475             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
476                             + queryObject.getQuery() + "]");
477         }
478         ResultSet results = null;
479         try {
480             SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
481
482             if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) {
483                 if(queryObject.getConsistency() == null) {
484                     statement.setConsistencyLevel(ConsistencyLevel.ONE);
485                 } else {
486                     statement.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
487                 }
488             }
489             else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
490                 statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
491             }
492
493             results = session.execute(statement);
494
495         } catch (Exception ex) {
496             logger.error(EELFLoggerDelegate.errorLogger, "Execute Get Error" + ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject
497                 .getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ex);
498             throw new MusicServiceException("Execute Get Error" + ex.getMessage());
499         }
500         
501         return results;
502         
503     }
504     
505     /**
506      * This method performs DDL operations on Cassandra using consistency level ONE.
507      * 
508      * @param queryObject Object containing cassandra prepared query and values.
509      */
510     public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
511                     throws MusicServiceException, MusicQueryException {
512         return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
513     }
514
515     /**
516      * 
517      * This method performs DDL operation on Cassandra using consistency level QUORUM.
518      * 
519      * @param queryObject Object containing cassandra prepared query and values.
520      */
521     public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
522                     throws MusicServiceException, MusicQueryException {
523         return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);
524     }
525
526 }