X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fmusic%2Fdatastore%2FMusicDataStore.java;h=2b88b4a8a586e1fe75d4f953cf3fd31a77121d3f;hb=d6e7b63cc580e7b3822be61fe92a493ad5e222a3;hp=38f5fe9fdb88fae7b30c2dedd44fb09a7257274c;hpb=5a79e022277813eba9e92349e51ed0e3c5419dda;p=music.git diff --git a/src/main/java/org/onap/music/datastore/MusicDataStore.java b/src/main/java/org/onap/music/datastore/MusicDataStore.java old mode 100644 new mode 100755 index 38f5fe9f..2b88b4a8 --- a/src/main/java/org/onap/music/datastore/MusicDataStore.java +++ b/src/main/java/org/onap/music/datastore/MusicDataStore.java @@ -4,21 +4,25 @@ * =================================================================== * Copyright (c) 2017 AT&T Intellectual Property * =================================================================== + * Modifications Copyright (c) 2018-2019 IBM + * Modifications Copyright (c) 2019 Samsung + * =================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * + * * ============LICENSE_END============================================= * ==================================================================== */ + package org.onap.music.datastore; import java.net.InetAddress; @@ -30,6 +34,9 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; import java.util.Map; + +import org.apache.commons.jcs.access.CacheAccess; +import org.onap.music.authentication.CachingUtil; import org.onap.music.eelf.logging.EELFLoggerDelegate; import org.onap.music.eelf.logging.format.AppMessages; import org.onap.music.eelf.logging.format.ErrorSeverity; @@ -37,6 +44,7 @@ import org.onap.music.eelf.logging.format.ErrorTypes; import org.onap.music.exceptions.MusicQueryException; import org.onap.music.exceptions.MusicServiceException; import org.onap.music.main.MusicUtil; +import com.codahale.metrics.JmxReporter; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ColumnDefinitions; import com.datastax.driver.core.ColumnDefinitions.Definition; @@ -50,6 +58,7 @@ import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.TableMetadata; import com.datastax.driver.core.exceptions.AlreadyExistsException; import com.datastax.driver.core.exceptions.InvalidQueryException; @@ -62,18 +71,19 @@ import com.sun.jersey.core.util.Base64; */ public class MusicDataStore { + public static final String CONSISTENCY_LEVEL_ONE = "ONE"; + public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM"; private Session session; private Cluster cluster; - /** * @param session */ public void setSession(Session session) { this.session = session; } - + /** * @param session */ @@ -89,11 +99,10 @@ public class MusicDataStore { } - private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class); /** - * + * */ public MusicDataStore() { connectToCassaCluster(); @@ -110,7 +119,7 @@ public class MusicDataStore { } /** - * + * * @param remoteIp * @throws MusicServiceException */ @@ -118,30 +127,32 @@ public class MusicDataStore { try { connectToCassaCluster(remoteIp); } catch (MusicServiceException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage()); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e); } } /** - * + * * @return */ private ArrayList getAllPossibleLocalIps() { - ArrayList allPossibleIps = new ArrayList(); + ArrayList allPossibleIps = new ArrayList<>(); try { Enumeration en = NetworkInterface.getNetworkInterfaces(); while (en.hasMoreElements()) { - NetworkInterface ni = (NetworkInterface) en.nextElement(); + NetworkInterface ni = en.nextElement(); Enumeration ee = ni.getInetAddresses(); while (ee.hasMoreElements()) { - InetAddress ia = (InetAddress) ee.nextElement(); + InetAddress ia = ee.nextElement(); allPossibleIps.add(ia.getHostAddress()); } } } catch (SocketException e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, + ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e); }catch(Exception e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes + .GENERALSERVICEERROR, e); } return allPossibleIps; } @@ -151,12 +162,12 @@ public class MusicDataStore { * clusters. */ private void connectToCassaCluster() { - Iterator it = getAllPossibleLocalIps().iterator(); + Iterator it = getAllPossibleLocalIps().iterator(); String address = "localhost"; String[] addresses = null; address = MusicUtil.getMyCassaHost(); - addresses = address.split(","); - + addresses = address.split(","); + logger.info(EELFLoggerDelegate.applicationLogger, "Connecting to cassa cluster: Iterating through possible ips:" + getAllPossibleLocalIps()); @@ -166,20 +177,21 @@ public class MusicDataStore { .setConnectionsPerHost(HostDistance.REMOTE, 2, 4); while (it.hasNext()) { try { - if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) { - logger.info(EELFLoggerDelegate.applicationLogger, - "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd()); - cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) - .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd()) - //.withLoadBalancingPolicy(new RoundRobinPolicy()) - .withPoolingOptions(poolingOptions) - .addContactPoints(addresses).build(); - } - else - cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) - //.withLoadBalancingPolicy(new RoundRobinPolicy()) - .addContactPoints(addresses).build(); - + if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) { + logger.info(EELFLoggerDelegate.applicationLogger, + "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd()); + cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) + .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd()) + //.withLoadBalancingPolicy(new RoundRobinPolicy()) + .withoutJMXReporting() + .withPoolingOptions(poolingOptions) + .addContactPoints(addresses).build(); + } + else + cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) + //.withLoadBalancingPolicy(new RoundRobinPolicy()) + .addContactPoints(addresses).build(); + Metadata metadata = cluster.getMetadata(); logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster " + metadata.getClusterName() + " at " + address); @@ -188,13 +200,14 @@ public class MusicDataStore { break; } catch (NoHostAvailableException e) { address = it.next(); - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR); + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE, + ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e); } } } /** - * + * */ public void close() { session.close(); @@ -202,45 +215,55 @@ public class MusicDataStore { /** * This method connects to cassandra cluster on specific address. - * + * * @param address */ private void connectToCassaCluster(String address) throws MusicServiceException { - String[] addresses = null; - addresses = address.split(","); - PoolingOptions poolingOptions = new PoolingOptions(); + String[] addresses = null; + addresses = address.split(","); + PoolingOptions poolingOptions = new PoolingOptions(); poolingOptions .setConnectionsPerHost(HostDistance.LOCAL, 4, 10) .setConnectionsPerHost(HostDistance.REMOTE, 2, 4); if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) { - logger.info(EELFLoggerDelegate.applicationLogger, - "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd()); - cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) - .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd()) - //.withLoadBalancingPolicy(new RoundRobinPolicy()) - .withPoolingOptions(poolingOptions) - .addContactPoints(addresses).build(); - } - else { - cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) - //.withLoadBalancingPolicy(new RoundRobinPolicy()) - .withPoolingOptions(poolingOptions) - .addContactPoints(addresses).build(); + logger.info(EELFLoggerDelegate.applicationLogger, + "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd()); + cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) + .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd()) + //.withLoadBalancingPolicy(new RoundRobinPolicy()) + .withoutJMXReporting() + .withPoolingOptions(poolingOptions) + .addContactPoints(addresses).build(); + } else { + cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort()) + //.withLoadBalancingPolicy(new RoundRobinPolicy()) + .withoutJMXReporting() + .withPoolingOptions(poolingOptions) + .addContactPoints(addresses).build(); } + + // JmxReporter reporter = + // JmxReporter.forRegistry(cluster.getMetrics().getRegistry()) + // .inDomain(cluster.getClusterName() + "-metrics") + // .build(); + + // reporter.start(); + Metadata metadata = cluster.getMetadata(); logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster " + metadata.getClusterName() + " at " + address); try { session = cluster.connect(); } catch (Exception ex) { - logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY, ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE); + logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.CASSANDRACONNECTIVITY, + ErrorSeverity.ERROR, ErrorTypes.SERVICEUNAVAILABLE, ex); throw new MusicServiceException( "Error while connecting to Cassandra cluster.. " + ex.getMessage()); } } /** - * + * * @param keyspace * @param tableName * @param columnName @@ -254,7 +277,7 @@ public class MusicDataStore { } /** - * + * * @param keyspace * @param tableName * @return TableMetadata @@ -263,11 +286,22 @@ public class MusicDataStore { KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace); return ks.getTable(tableName); } + + /** + * + * @param keyspace + * @param tableName + * @return TableMetadata + */ + public KeyspaceMetadata returnKeyspaceMetadata(String keyspace) { + KeyspaceMetadata ks = cluster.getMetadata().getKeyspace(keyspace); + return ks; + } /** * Utility function to return the Java specific object type. - * + * * @param row * @param colName * @param colType @@ -295,16 +329,15 @@ public class MusicDataStore { case MAP: return row.getMap(colName, String.class, String.class); case LIST: - return row.getList(colName, String.class); + return row.getList(colName, String.class); default: return null; } } - + public byte[] getBlobValue(Row row, String colName, DataType colType) { - ByteBuffer bb = row.getBytes(colName); - byte[] data = bb.array(); - return data; + ByteBuffer bb = row.getBytes(colName); + return bb.array(); } public boolean doesRowSatisfyCondition(Row row, Map condition) throws Exception { @@ -315,7 +348,7 @@ public class MusicDataStore { DataType colType = colInfo.getType(colName); Object columnValue = getColValue(row, colName, colType); Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue()); - if (columnValue.equals(conditionValue)) + if (columnValue.equals(conditionValue) == false) return false; } return true; @@ -323,26 +356,26 @@ public class MusicDataStore { /** * Utility function to store ResultSet values in to a MAP for output. - * + * * @param results * @return MAP */ public Map> marshalData(ResultSet results) { Map> resultMap = - new HashMap>(); + new HashMap<>(); int counter = 0; for (Row row : results) { ColumnDefinitions colInfo = row.getColumnDefinitions(); - HashMap resultOutput = new HashMap(); + HashMap resultOutput = new HashMap<>(); for (Definition definition : colInfo) { - if (!"vector_ts".equals(definition.getName())) { - if(definition.getType().toString().toLowerCase().contains("blob")) { - resultOutput.put(definition.getName(), + if (!(("vector_ts").equals(definition.getName()))) { + if(definition.getType().toString().toLowerCase().contains("blob")) { + resultOutput.put(definition.getName(), getBlobValue(row, definition.getName(), definition.getType())); - } - else - resultOutput.put(definition.getName(), + } else { + resultOutput.put(definition.getName(), getColValue(row, definition.getName(), definition.getType())); + } } } resultMap.put("row " + counter, resultOutput); @@ -353,9 +386,14 @@ public class MusicDataStore { // Prepared Statements 1802 additions + + public boolean executePut(PreparedQueryObject queryObject, String consistency) + throws MusicServiceException, MusicQueryException { + return executePut(queryObject, consistency, 0); + } /** * This Method performs DDL and DML operations on Cassandra using specified consistency level - * + * * @param queryObject Object containing cassandra prepared query and values. * @param consistency Specify consistency level for data synchronization across cassandra * replicas @@ -363,13 +401,13 @@ public class MusicDataStore { * @throws MusicServiceException * @throws MusicQueryException */ - public boolean executePut(PreparedQueryObject queryObject, String consistency) + public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot) throws MusicServiceException, MusicQueryException { boolean result = false; - + long timeOfWrite = System.currentTimeMillis(); if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) { - logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); + logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); throw new MusicQueryException("Ill formed queryObject for the request = " + "[" + queryObject.getQuery() + "]"); } @@ -377,39 +415,42 @@ public class MusicDataStore { "In preprared Execute Put: the actual insert query:" + queryObject.getQuery() + "; the values" + queryObject.getValues()); - PreparedStatement preparedInsert = null; - try { - - preparedInsert = session.prepare(queryObject.getQuery()); - - } catch(InvalidQueryException iqe) { - logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); - throw new MusicQueryException(iqe.getMessage()); - }catch(Exception e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR); - throw new MusicQueryException(e.getMessage()); - } - + SimpleStatement preparedInsert = null; + try { + preparedInsert = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray()); if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) { logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query"); preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM); } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) { logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query"); + if(queryObject.getConsistency() == null) + preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE); + else + preparedInsert.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency())); + } else if (consistency.equalsIgnoreCase(MusicUtil.ONE)) { preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE); + } else if (consistency.equalsIgnoreCase(MusicUtil.QUORUM)) { + preparedInsert.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM); + } else if (consistency.equalsIgnoreCase(MusicUtil.ALL)) { + preparedInsert.setConsistencyLevel(ConsistencyLevel.ALL); } + long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite); + preparedInsert.setDefaultTimestamp(timestamp); - ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray())); + ResultSet rs = session.execute(preparedInsert); result = rs.wasApplied(); } catch (AlreadyExistsException ae) { - logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicServiceException(ae.getMessage()); + logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ae); + throw new MusicServiceException(ae.getMessage()); } catch (Exception e) { - logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicQueryException("Executing Session Failure for Request = " + "[" + logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED + " [" + + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e); + throw new MusicServiceException("Executing Session Failure for Request = " + "[" + queryObject.getQuery() + "]" + " Reason = " + e.getMessage()); } @@ -417,51 +458,62 @@ public class MusicDataStore { return result; } - /** + /* *//** * This method performs DDL operations on Cassandra using consistency level ONE. - * + * * @param queryObject Object containing cassandra prepared query and values. * @return ResultSet * @throws MusicServiceException * @throws MusicQueryException - */ + *//* public ResultSet executeEventualGet(PreparedQueryObject queryObject) throws MusicServiceException, MusicQueryException { - + CacheAccess queryBank = CachingUtil.getStatementBank(); + PreparedStatement preparedEventualGet = null; if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) { - logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicQueryException("Ill formed queryObject for the request = " + "[" + logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); + throw new MusicQueryException("Ill formed queryObject for the request = " + "[" + queryObject.getQuery() + "]"); } logger.info(EELFLoggerDelegate.applicationLogger, "Executing Eventual get query:" + queryObject.getQuery()); - + ResultSet results = null; try { - PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery()); - preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE); - results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray())); + if(queryBank.get(queryObject.getQuery()) != null ) + preparedEventualGet=queryBank.get(queryObject.getQuery()); + else { + preparedEventualGet = session.prepare(queryObject.getQuery()); + CachingUtil.updateStatementBank(queryObject.getQuery(), preparedEventualGet); + } + if(queryObject.getConsistency() == null) { + preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE); + } else { + preparedEventualGet.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency())); + } + results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray())); } catch (Exception ex) { - logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicServiceException(ex.getMessage()); + logger.error("Exception", ex); + logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); + throw new MusicServiceException(ex.getMessage()); } return results; } - /** - * + *//** + * * This method performs DDL operation on Cassandra using consistency level QUORUM. - * + * * @param queryObject Object containing cassandra prepared query and values. * @return ResultSet * @throws MusicServiceException * @throws MusicQueryException - */ + *//* public ResultSet executeCriticalGet(PreparedQueryObject queryObject) throws MusicServiceException, MusicQueryException { if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) { - logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); + logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "[" + queryObject.getQuery() + "]"); } @@ -473,11 +525,66 @@ public class MusicDataStore { try { results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray())); } catch (Exception ex) { - logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); - throw new MusicServiceException(ex.getMessage()); + logger.error("Exception", ex); + logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); + throw new MusicServiceException(ex.getMessage()); } return results; } + */ + public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException { + if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) { + logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR); + throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "[" + + queryObject.getQuery() + "]"); + } + ResultSet results = null; + try { + SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray()); + + if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) { + if(queryObject.getConsistency() == null) { + statement.setConsistencyLevel(ConsistencyLevel.ONE); + } else { + statement.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency())); + } + } + else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) { + statement.setConsistencyLevel(ConsistencyLevel.QUORUM); + } + + results = session.execute(statement); + + } catch (Exception ex) { + logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject + .getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, ex); + throw new MusicServiceException(ex.getMessage()); + } + + return results; + + } + + /** + * This method performs DDL operations on Cassandra using consistency level ONE. + * + * @param queryObject Object containing cassandra prepared query and values. + */ + public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject) + throws MusicServiceException, MusicQueryException { + return executeGet(queryObject, CONSISTENCY_LEVEL_ONE); + } + + /** + * + * This method performs DDL operation on Cassandra using consistency level QUORUM. + * + * @param queryObject Object containing cassandra prepared query and values. + */ + public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject) + throws MusicServiceException, MusicQueryException { + return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM); + } }