Read lock promotion
[music.git] / src / main / java / org / onap / music / datastore / MusicDataStore.java
1 /*
2  * ============LICENSE_START==========================================
3  * org.onap.music
4  * ===================================================================
5  *  Copyright (c) 2017 AT&T Intellectual Property
6  * ===================================================================
7  *  Modifications Copyright (c) 2018-2019 IBM
8  *  Modifications Copyright (c) 2019 Samsung
9  * ===================================================================
10  *  Licensed under the Apache License, Version 2.0 (the "License");
11  *  you may not use this file except in compliance with the License.
12  *  You may obtain a copy of the License at
13  *
14  *     http://www.apache.org/licenses/LICENSE-2.0
15  *
16  *  Unless required by applicable law or agreed to in writing, software
17  *  distributed under the License is distributed on an "AS IS" BASIS,
18  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19  *  See the License for the specific language governing permissions and
20  *  limitations under the License.
21  *
22  * ============LICENSE_END=============================================
23  * ====================================================================
24  */
25
26 package org.onap.music.datastore;
27
28 import java.net.InetAddress;
29 import java.net.NetworkInterface;
30 import java.net.SocketException;
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.Enumeration;
34 import java.util.HashMap;
35 import java.util.Iterator;
36 import java.util.Map;
37
38 import org.onap.music.eelf.logging.EELFLoggerDelegate;
39 import org.onap.music.eelf.logging.format.AppMessages;
40 import org.onap.music.eelf.logging.format.ErrorSeverity;
41 import org.onap.music.eelf.logging.format.ErrorTypes;
42 import org.onap.music.exceptions.MusicQueryException;
43 import org.onap.music.exceptions.MusicServiceException;
44 import org.onap.music.lockingservice.cassandra.LockType;
45 import org.onap.music.main.CipherUtil;
46 import org.onap.music.main.MusicUtil;
47 import com.datastax.driver.core.Cluster;
48 import com.datastax.driver.core.ColumnDefinitions;
49 import com.datastax.driver.core.ColumnDefinitions.Definition;
50 import com.datastax.driver.core.ConsistencyLevel;
51 import com.datastax.driver.core.DataType;
52 import com.datastax.driver.core.HostDistance;
53 import com.datastax.driver.core.KeyspaceMetadata;
54 import com.datastax.driver.core.Metadata;
55 import com.datastax.driver.core.PoolingOptions;
56 import com.datastax.driver.core.ResultSet;
57 import com.datastax.driver.core.Row;
58 import com.datastax.driver.core.Session;
59 import com.datastax.driver.core.SimpleStatement;
60 import com.datastax.driver.core.TableMetadata;
61 import com.datastax.driver.core.TypeCodec;
62 import com.datastax.driver.core.exceptions.AlreadyExistsException;
63 import com.datastax.driver.core.exceptions.InvalidQueryException;
64 import com.datastax.driver.core.exceptions.NoHostAvailableException;
65 import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
66 import com.datastax.driver.extras.codecs.enums.EnumOrdinalCodec;
67
68 /**
69  * @author nelson24
70  *
71  */
72 public class MusicDataStore {
73
74     public static final String CONSISTENCY_LEVEL_ONE = "ONE";
75     public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
76     private Session session;
77     private Cluster cluster;
78
79
80     /**
81      * @param session
82      */
83     public void setSession(Session session) {
84         this.session = session;
85     }
86
87     /**
88      * @param session
89      */
90     public Session getSession() {
91         return session;
92     }
93
94     /**
95      * @param cluster
96      */
97     public void setCluster(Cluster cluster) {
98         this.cluster = cluster;
99     }
100     
101     public Cluster getCluster() {
102         return this.cluster;
103     }
104
105
106     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
107
108     /**
109      * Connect to default Cassandra address
110      */
111     public MusicDataStore() {
112         try {
113             connectToCassaCluster(MusicUtil.getMyCassaHost());
114         } catch (MusicServiceException e) {
115             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
116         }
117     }
118
119
120     /**
121      * @param cluster
122      * @param session
123      */
124     public MusicDataStore(Cluster cluster, Session session) {
125         this.session = session;
126         this.cluster = cluster;
127     }
128
129     /**
130      *
131      * @param remoteIp
132      * @throws MusicServiceException
133      */
134     public MusicDataStore(String remoteIp) {
135         try {
136             connectToCassaCluster(remoteIp);
137         } catch (MusicServiceException e) {
138             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
139         }
140     }
141
142     /**
143      *
144      */
145     public void close() {
146         session.close();
147     }
148
149     /**
150      * This method connects to cassandra cluster on specific address.
151      *
152      * @param address
153      */
154     private void connectToCassaCluster(String address) throws MusicServiceException {
155         String[] addresses = null;
156         addresses = address.split(",");
157         PoolingOptions poolingOptions = new PoolingOptions();
158         poolingOptions
159         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
160         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
161         
162         if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
163             String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd());
164             logger.info(EELFLoggerDelegate.applicationLogger,
165                     "Building with credentials "+MusicUtil.getCassName()+" & "+ MusicUtil.getCassPwd());
166             cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
167                         .withCredentials(MusicUtil.getCassName(), cassPwd)
168                         //.withLoadBalancingPolicy(new RoundRobinPolicy())
169                         .withoutJMXReporting()
170                         .withPoolingOptions(poolingOptions)
171                         .addContactPoints(addresses).build();
172         } else {
173             cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
174                         .withoutJMXReporting()
175                         .withPoolingOptions(poolingOptions)
176                         .addContactPoints(addresses)
177                         .build();
178         }
179         
180         
181         Metadata metadata = cluster.getMetadata();
182         logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
183                         + metadata.getClusterName() + " at " + address);
184         
185         EnumNameCodec<LockType> lockTypeCodec = new EnumNameCodec<LockType>(LockType.class);
186         cluster.getConfiguration().getCodecRegistry().register(lockTypeCodec);
187
188         try {
189             session = cluster.connect();
190         } catch (Exception ex) {
191             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY,
192                 ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE, ex);
193             throw new MusicServiceException(
194                             "Error while connecting to Cassandra cluster.. " + ex.getMessage());
195         }
196     }
197
198     /**
199      *
200      * @param keyspace
201      * @param tableName
202      * @param columnName
203      * @return DataType
204      */
205     public DataType returnColumnDataType(String keyspace, String tableName, String columnName) {
206         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
207         TableMetadata table = ks.getTable(tableName);
208         return table.getColumn(columnName).getType();
209
210     }
211
212     /**
213      *
214      * @param keyspace
215      * @param tableName
216      * @return TableMetadata
217      */
218     public TableMetadata returnColumnMetadata(String keyspace, String tableName) {
219         KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace);
220         return ks.getTable(tableName);
221     }
222     
223     /**
224     *
225     * @param keyspace
226     * @param tableName
227     * @return TableMetadata
228     */
229    public KeyspaceMetadata returnKeyspaceMetadata(String keyspace) {
230        return cluster.getMetadata().getKeyspace(keyspace);
231    }
232
233
234     /**
235      * Utility function to return the Java specific object type.
236      *
237      * @param row
238      * @param colName
239      * @param colType
240      * @return
241      */
242     public Object getColValue(Row row, String colName, DataType colType) {
243
244         switch (colType.getName()) {
245             case VARCHAR:
246                 return row.getString(colName);
247             case UUID:
248                 return row.getUUID(colName);
249             case VARINT:
250                 return row.getVarint(colName);
251             case BIGINT:
252                 return row.getLong(colName);
253             case INT:
254                 return row.getInt(colName);
255             case FLOAT:
256                 return row.getFloat(colName);
257             case DOUBLE:
258                 return row.getDouble(colName);
259             case BOOLEAN:
260                 return row.getBool(colName);
261             case MAP:
262                 return row.getMap(colName, String.class, String.class);
263             case LIST:
264                 return row.getList(colName, String.class);
265             default:
266                 return null;
267         }
268     }
269
270     public byte[] getBlobValue(Row row, String colName, DataType colType) {
271         ByteBuffer bb = row.getBytes(colName);
272         return bb.array();
273     }
274
275     public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
276         ColumnDefinitions colInfo = row.getColumnDefinitions();
277
278         for (Map.Entry<String, Object> entry : condition.entrySet()) {
279             String colName = entry.getKey();
280             DataType colType = colInfo.getType(colName);
281             Object columnValue = getColValue(row, colName, colType);
282             Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
283             if (columnValue.equals(conditionValue) == false)
284                 return false;
285         }
286         return true;
287     }
288
289     /**
290      * Utility function to store ResultSet values in to a MAP for output.
291      *
292      * @param results
293      * @return MAP
294      */
295     public Map<String, HashMap<String, Object>> marshalData(ResultSet results) {
296         Map<String, HashMap<String, Object>> resultMap =
297                         new HashMap<>();
298         int counter = 0;
299         for (Row row : results) {
300             ColumnDefinitions colInfo = row.getColumnDefinitions();
301             HashMap<String, Object> resultOutput = new HashMap<>();
302             for (Definition definition : colInfo) {
303                 if (!(("vector_ts").equals(definition.getName()))) {
304                     if(definition.getType().toString().toLowerCase().contains("blob")) {
305                         resultOutput.put(definition.getName(),
306                                 getBlobValue(row, definition.getName(), definition.getType()));
307                     } else {
308                         resultOutput.put(definition.getName(),
309                                     getColValue(row, definition.getName(), definition.getType()));
310                     }
311                 }
312             }
313             resultMap.put("row " + counter, resultOutput);
314             counter++;
315         }
316         return resultMap;
317     }
318
319
320     // Prepared Statements 1802 additions
321     
322     public boolean executePut(PreparedQueryObject queryObject, String consistency)
323             throws MusicServiceException, MusicQueryException {
324         return executePut(queryObject, consistency, 0);
325     }
326     /**
327      * This Method performs DDL and DML operations on Cassandra using specified consistency level
328      *
329      * @param queryObject Object containing cassandra prepared query and values.
330      * @param consistency Specify consistency level for data synchronization across cassandra
331      *        replicas
332      * @return Boolean Indicates operation success or failure
333      * @throws MusicServiceException
334      * @throws MusicQueryException
335      */
336     public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot)
337                     throws MusicServiceException, MusicQueryException {
338
339         boolean result = false;
340         long timeOfWrite = System.currentTimeMillis();
341         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
342             logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
343             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
344                             + queryObject.getQuery() + "]");
345         }
346         logger.debug(EELFLoggerDelegate.applicationLogger,
347                         "In preprared Execute Put: the actual insert query:"
348                                         + queryObject.getQuery() + "; the values"
349                                         + queryObject.getValues());
350         SimpleStatement preparedInsert = null;
351
352         try {
353             preparedInsert = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
354             if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
355                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
356                 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
357             } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
358                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
359                 if(queryObject.getConsistency() == null)
360                     preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
361                 else
362                     preparedInsert.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
363             } else if (consistency.equalsIgnoreCase(MusicUtil.ONE)) {
364                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
365             }  else if (consistency.equalsIgnoreCase(MusicUtil.QUORUM)) {
366                 preparedInsert.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
367             } else if (consistency.equalsIgnoreCase(MusicUtil.ALL)) {
368                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ALL);
369             }
370             long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite);
371             preparedInsert.setDefaultTimestamp(timestamp);
372
373             ResultSet rs = session.execute(preparedInsert);
374             result = rs.wasApplied();
375
376         }
377         catch (AlreadyExistsException ae) {
378             // logger.error(EELFLoggerDelegate.errorLogger,"AlreadExistsException: " + ae.getMessage(),AppMessages.QUERYERROR,
379             // ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
380             throw new MusicQueryException("AlreadyExistsException: " + ae.getMessage(),ae);
381         } catch ( InvalidQueryException e ) {
382             // logger.error(EELFLoggerDelegate.errorLogger,"InvalidQueryException: " + e.getMessage(),AppMessages.SESSIONFAILED + " [" 
383             // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
384             throw new MusicQueryException("InvalidQueryException: " + e.getMessage(),e);
385         } catch (Exception e) {
386             // logger.error(EELFLoggerDelegate.errorLogger,e.getClass().toString() + ":" + e.getMessage(),AppMessages.SESSIONFAILED + " [" 
387             //     + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e);
388             throw new MusicServiceException("Executing Session Failure for Request = " + "["
389                 + queryObject.getQuery() + "]" + " Reason = " + e.getMessage(),e);
390         }
391         return result;
392     }
393
394  /*   *//**
395      * This method performs DDL operations on Cassandra using consistency level ONE.
396      *
397      * @param queryObject Object containing cassandra prepared query and values.
398      * @return ResultSet
399      * @throws MusicServiceException
400      * @throws MusicQueryException
401      *//*
402     public ResultSet executeEventualGet(PreparedQueryObject queryObject)
403                     throws MusicServiceException, MusicQueryException {
404         CacheAccess<String, PreparedStatement> queryBank = CachingUtil.getStatementBank();
405         PreparedStatement preparedEventualGet = null;
406         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
407             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
408             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
409                             + queryObject.getQuery() + "]");
410         }
411         logger.info(EELFLoggerDelegate.applicationLogger,
412                         "Executing Eventual  get query:" + queryObject.getQuery());
413
414         ResultSet results = null;
415         try {
416             if(queryBank.get(queryObject.getQuery()) != null )
417                 preparedEventualGet=queryBank.get(queryObject.getQuery());
418             else {
419                 preparedEventualGet = session.prepare(queryObject.getQuery());
420                 CachingUtil.updateStatementBank(queryObject.getQuery(), preparedEventualGet);
421             }
422             if(queryObject.getConsistency() == null) {
423                 preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
424             } else {
425                 preparedEventualGet.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
426             }
427             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
428
429         } catch (Exception ex) {
430             logger.error("Exception", ex);
431             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
432             throw new MusicServiceException(ex.getMessage());
433         }
434         return results;
435     }
436
437     *//**
438      *
439      * This method performs DDL operation on Cassandra using consistency level QUORUM.
440      *
441      * @param queryObject Object containing cassandra prepared query and values.
442      * @return ResultSet
443      * @throws MusicServiceException
444      * @throws MusicQueryException
445      *//*
446     public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
447                     throws MusicServiceException, MusicQueryException {
448         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
449             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
450             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
451                             + queryObject.getQuery() + "]");
452         }
453         logger.info(EELFLoggerDelegate.applicationLogger,
454                         "Executing Critical get query:" + queryObject.getQuery());
455         PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
456         preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
457         ResultSet results = null;
458         try {
459             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
460         } catch (Exception ex) {
461             logger.error("Exception", ex);
462             logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
463             throw new MusicServiceException(ex.getMessage());
464         }
465         return results;
466
467     }
468     */
469     public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException {
470         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
471             logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
472             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
473                             + queryObject.getQuery() + "]");
474         }
475         ResultSet results = null;
476         try {
477             SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
478
479             if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) {
480                 if(queryObject.getConsistency() == null) {
481                     statement.setConsistencyLevel(ConsistencyLevel.ONE);
482                 } else {
483                     statement.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
484                 }
485             }
486             else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
487                 statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
488             }
489
490             results = session.execute(statement);
491
492         } catch (Exception ex) {
493             logger.error(EELFLoggerDelegate.errorLogger, "Execute Get Error" + ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject
494                 .getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ex);
495             throw new MusicServiceException("Execute Get Error" + ex.getMessage());
496         }
497         
498         return results;
499         
500     }
501     
502     /**
503      * This method performs DDL operations on Cassandra using consistency level ONE.
504      * 
505      * @param queryObject Object containing cassandra prepared query and values.
506      */
507     public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
508                     throws MusicServiceException, MusicQueryException {
509         return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
510     }
511
512     /**
513      * 
514      * This method performs DDL operation on Cassandra using consistency level QUORUM.
515      * 
516      * @param queryObject Object containing cassandra prepared query and values.
517      */
518     public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
519                     throws MusicServiceException, MusicQueryException {
520         return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);
521     }
522
523 }