Push variuos changes
[music.git] / src / main / java / org / onap / music / datastore / MusicDataStore.java
old mode 100644 (file)
new mode 100755 (executable)
index c6b022c..bb8e537
@@ -9,18 +9,19 @@
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
  *  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  *  Unless required by applicable law or agreed to in writing, software
  *  distributed under the License is distributed on an "AS IS" BASIS,
  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
- * 
+ *
  * ============LICENSE_END=============================================
  * ====================================================================
  */
+
 package org.onap.music.datastore;
 
 import java.net.InetAddress;
@@ -33,14 +34,17 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.commons.jcs.access.CacheAccess;
 import org.onap.music.eelf.logging.EELFLoggerDelegate;
 import org.onap.music.eelf.logging.format.AppMessages;
 import org.onap.music.eelf.logging.format.ErrorSeverity;
 import org.onap.music.eelf.logging.format.ErrorTypes;
 import org.onap.music.exceptions.MusicQueryException;
 import org.onap.music.exceptions.MusicServiceException;
+import org.onap.music.main.CachingUtil;
 import org.onap.music.main.MusicUtil;
 
+import com.codahale.metrics.JmxReporter;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ColumnDefinitions;
 import com.datastax.driver.core.ColumnDefinitions.Definition;
@@ -54,10 +58,12 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
 import com.datastax.driver.core.TableMetadata;
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.sun.jersey.core.util.Base64;
 
 /**
  * @author nelson24
@@ -65,10 +71,33 @@ import com.datastax.driver.core.exceptions.NoHostAvailableException;
  */
 public class MusicDataStore {
 
+    public static final String CONSISTENCY_LEVEL_ONE = "ONE";
+    public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
     private Session session;
     private Cluster cluster;
 
 
+    /**
+     * @param session
+     */
+    public void setSession(Session session) {
+        this.session = session;
+    }
+
+    /**
+     * @param session
+     */
+    public Session getSession() {
+        return session;
+    }
+
+    /**
+     * @param cluster
+     */
+    public void setCluster(Cluster cluster) {
+        this.cluster = cluster;
+    }
+
 
     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
 
@@ -103,31 +132,8 @@ public class MusicDataStore {
         }
     }
 
-
-
     /**
-     * @param session
-     */
-    public void setSession(Session session) {
-        this.session = session;
-    }
-    
-    /**
-     * @param session
-     */
-    public Session getSession() {
-        return session;
-    }
-
-    /**
-     * @param cluster
-     */
-    public void setCluster(Cluster cluster) {
-        this.cluster = cluster;
-    }
-
-    /**
-     * 
+     *
      * @return
      */
     private ArrayList<String> getAllPossibleLocalIps() {
@@ -147,7 +153,7 @@ public class MusicDataStore {
             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR, ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR);
         }catch(Exception e) {
             logger.error("Exception", e);
-               logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
+            logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes.GENERALSERVICEERROR);
         }
         return allPossibleIps;
     }
@@ -157,12 +163,12 @@ public class MusicDataStore {
      * clusters.
      */
     private void connectToCassaCluster() {
-       Iterator<String> it = getAllPossibleLocalIps().iterator();
+        Iterator<String> it = getAllPossibleLocalIps().iterator();
         String address = "localhost";
         String[] addresses = null;
         address = MusicUtil.getMyCassaHost();
-               addresses = address.split(",");
-               
+        addresses = address.split(",");
+
         logger.info(EELFLoggerDelegate.applicationLogger,
                         "Connecting to cassa cluster: Iterating through possible ips:"
                                         + getAllPossibleLocalIps());
@@ -172,20 +178,21 @@ public class MusicDataStore {
         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
         while (it.hasNext()) {
             try {
-               if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
-                       logger.info(EELFLoggerDelegate.applicationLogger,
-                                       "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
-                       cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
-                                          .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
-                                          //.withLoadBalancingPolicy(new RoundRobinPolicy())
-                                          .withPoolingOptions(poolingOptions)
-                                          .addContactPoints(addresses).build();
-               }
-               else
-                       cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
-                                                               //.withLoadBalancingPolicy(new RoundRobinPolicy())
-                                                               .addContactPoints(addresses).build();
-                
+                if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
+                    logger.info(EELFLoggerDelegate.applicationLogger,
+                            "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
+                    cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
+                                       .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
+                                       //.withLoadBalancingPolicy(new RoundRobinPolicy())
+                                       .withoutJMXReporting()
+                                       .withPoolingOptions(poolingOptions)
+                                       .addContactPoints(addresses).build();
+                }
+                else
+                    cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
+                                         //.withLoadBalancingPolicy(new RoundRobinPolicy())
+                                         .addContactPoints(addresses).build();
+
                 Metadata metadata = cluster.getMetadata();
                 logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
                                 + metadata.getClusterName() + " at " + address);
@@ -201,7 +208,7 @@ public class MusicDataStore {
     }
 
     /**
-     * 
+     *
      */
     public void close() {
         session.close();
@@ -209,31 +216,41 @@ public class MusicDataStore {
 
     /**
      * This method connects to cassandra cluster on specific address.
-     * 
+     *
      * @param address
      */
     private void connectToCassaCluster(String address) throws MusicServiceException {
-       String[] addresses = null;
-               addresses = address.split(",");
-               PoolingOptions poolingOptions = new PoolingOptions();
+        String[] addresses = null;
+        addresses = address.split(",");
+        PoolingOptions poolingOptions = new PoolingOptions();
         poolingOptions
         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
         if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
-               logger.info(EELFLoggerDelegate.applicationLogger,
-                               "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
-               cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
-                          .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
-                          //.withLoadBalancingPolicy(new RoundRobinPolicy())
-                          .withPoolingOptions(poolingOptions)
-                          .addContactPoints(addresses).build();
+            logger.info(EELFLoggerDelegate.applicationLogger,
+                    "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
+            cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
+                       .withCredentials(MusicUtil.getCassName(), MusicUtil.getCassPwd())
+                       //.withLoadBalancingPolicy(new RoundRobinPolicy())
+                       .withoutJMXReporting()
+                       .withPoolingOptions(poolingOptions)
+                       .addContactPoints(addresses).build();
         }
         else {
-               cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
-                                       //.withLoadBalancingPolicy(new RoundRobinPolicy())
-                                       .withPoolingOptions(poolingOptions)
-                                       .addContactPoints(addresses).build();
+            cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
+                        //.withLoadBalancingPolicy(new RoundRobinPolicy())
+                        .withoutJMXReporting()
+                        .withPoolingOptions(poolingOptions)
+                        .addContactPoints(addresses).build();
         }
+        
+        // JmxReporter reporter =
+        //         JmxReporter.forRegistry(cluster.getMetrics().getRegistry())
+        //             .inDomain(cluster.getClusterName() + "-metrics")
+        //             .build();
+
+        //     reporter.start();
+            
         Metadata metadata = cluster.getMetadata();
         logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
                         + metadata.getClusterName() + " at " + address);
@@ -248,7 +265,7 @@ public class MusicDataStore {
     }
 
     /**
-     * 
+     *
      * @param keyspace
      * @param tableName
      * @param columnName
@@ -262,7 +279,7 @@ public class MusicDataStore {
     }
 
     /**
-     * 
+     *
      * @param keyspace
      * @param tableName
      * @return TableMetadata
@@ -275,7 +292,7 @@ public class MusicDataStore {
 
     /**
      * Utility function to return the Java specific object type.
-     * 
+     *
      * @param row
      * @param colName
      * @param colType
@@ -303,15 +320,15 @@ public class MusicDataStore {
             case MAP:
                 return row.getMap(colName, String.class, String.class);
             case LIST:
-               return row.getList(colName, String.class);
+                return row.getList(colName, String.class);
             default:
                 return null;
         }
     }
-    
+
     public byte[] getBlobValue(Row row, String colName, DataType colType) {
-       ByteBuffer bb = row.getBytes(colName);
-       return bb.array();
+        ByteBuffer bb = row.getBytes(colName);
+        return bb.array();
     }
 
     public boolean doesRowSatisfyCondition(Row row, Map<String, Object> condition) throws Exception {
@@ -322,7 +339,7 @@ public class MusicDataStore {
             DataType colType = colInfo.getType(colName);
             Object columnValue = getColValue(row, colName, colType);
             Object conditionValue = MusicUtil.convertToActualDataType(colType, entry.getValue());
-            if (!columnValue.equals(conditionValue))
+            if (columnValue.equals(conditionValue) == false)
                 return false;
         }
         return true;
@@ -330,7 +347,7 @@ public class MusicDataStore {
 
     /**
      * Utility function to store ResultSet values in to a MAP for output.
-     * 
+     *
      * @param results
      * @return MAP
      */
@@ -343,12 +360,12 @@ public class MusicDataStore {
             HashMap<String, Object> resultOutput = new HashMap<>();
             for (Definition definition : colInfo) {
                 if (!(("vector_ts").equals(definition.getName()))) {
-                       if(definition.getType().toString().toLowerCase().contains("blob")) {
-                               resultOutput.put(definition.getName(),
+                    if(definition.getType().toString().toLowerCase().contains("blob")) {
+                        resultOutput.put(definition.getName(),
                                 getBlobValue(row, definition.getName(), definition.getType()));
-                       } 
-                       else
-                               resultOutput.put(definition.getName(),
+                    }
+                    else
+                        resultOutput.put(definition.getName(),
                                     getColValue(row, definition.getName(), definition.getType()));
                 }
             }
@@ -360,9 +377,14 @@ public class MusicDataStore {
 
 
     // Prepared Statements 1802 additions
+    
+    public boolean executePut(PreparedQueryObject queryObject, String consistency)
+            throws MusicServiceException, MusicQueryException {
+        return executePut(queryObject, consistency, 0);
+    }
     /**
      * This Method performs DDL and DML operations on Cassandra using specified consistency level
-     * 
+     *
      * @param queryObject Object containing cassandra prepared query and values.
      * @param consistency Specify consistency level for data synchronization across cassandra
      *        replicas
@@ -370,13 +392,13 @@ public class MusicDataStore {
      * @throws MusicServiceException
      * @throws MusicQueryException
      */
-    public boolean executePut(PreparedQueryObject queryObject, String consistency)
+    public boolean executePut(PreparedQueryObject queryObject, String consistency,long timeSlot)
                     throws MusicServiceException, MusicQueryException {
 
         boolean result = false;
-
+        long timeOfWrite = System.currentTimeMillis();
         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
-               logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
+            logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
             throw new MusicQueryException("Ill formed queryObject for the request = " + "["
                             + queryObject.getQuery() + "]");
         }
@@ -384,43 +406,59 @@ public class MusicDataStore {
                         "In preprared Execute Put: the actual insert query:"
                                         + queryObject.getQuery() + "; the values"
                                         + queryObject.getValues());
+/*<<<<<<< HEAD
         PreparedStatement preparedInsert = null;
         try {
-               
-                               preparedInsert = session.prepare(queryObject.getQuery());
-                       
+            
+                preparedInsert = session.prepare(queryObject.getQuery());
+            
         } catch(InvalidQueryException iqe) {
             logger.error("Exception", iqe);
-               logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
-               throw new MusicQueryException(iqe.getMessage());
+            logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+            throw new MusicQueryException(iqe.getMessage());
         }catch(Exception e) {
             logger.error("Exception", e);
-               logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
-               throw new MusicQueryException(e.getMessage());
+            logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+            throw new MusicQueryException(e.getMessage());
         }
         
+=======*/
+        SimpleStatement preparedInsert = null;
+
         try {
+            preparedInsert = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
             if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
                 preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
             } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
+                if(queryObject.getConsistency() == null)
+                    preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
+               else
+                    preparedInsert.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
+            } else if (consistency.equalsIgnoreCase(MusicUtil.ONE)) {
                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
+            }  else if (consistency.equalsIgnoreCase(MusicUtil.QUORUM)) {
+                preparedInsert.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+            } else if (consistency.equalsIgnoreCase(MusicUtil.ALL)) {
+                preparedInsert.setConsistencyLevel(ConsistencyLevel.ALL);
             }
+            long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite);
+            preparedInsert.setDefaultTimestamp(timestamp);
 
-            ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
+            ResultSet rs = session.execute(preparedInsert);
             result = rs.wasApplied();
 
         }
         catch (AlreadyExistsException ae) {
             logger.error("Exception", ae);
             logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
-               throw new MusicServiceException(ae.getMessage());
+            throw new MusicServiceException(ae.getMessage());
         }
         catch (Exception e) {
             logger.error("Exception", e);
-               logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
-               throw new MusicQueryException("Executing Session Failure for Request = " + "["
+            logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
+            throw new MusicQueryException("Executing Session Failure for Request = " + "["
                             + queryObject.getQuery() + "]" + " Reason = " + e.getMessage());
         }
 
@@ -428,52 +466,63 @@ public class MusicDataStore {
         return result;
     }
 
   /**
/*   *//**
      * This method performs DDL operations on Cassandra using consistency level ONE.
-     * 
+     *
      * @param queryObject Object containing cassandra prepared query and values.
      * @return ResultSet
      * @throws MusicServiceException
      * @throws MusicQueryException
-     */
+     *//*
     public ResultSet executeEventualGet(PreparedQueryObject queryObject)
                     throws MusicServiceException, MusicQueryException {
-
+        CacheAccess<String, PreparedStatement> queryBank = CachingUtil.getStatementBank();
+        PreparedStatement preparedEventualGet = null;
         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
-               logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
-               throw new MusicQueryException("Ill formed queryObject for the request = " + "["
+            logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
+            throw new MusicQueryException("Ill formed queryObject for the request = " + "["
                             + queryObject.getQuery() + "]");
         }
         logger.info(EELFLoggerDelegate.applicationLogger,
                         "Executing Eventual  get query:" + queryObject.getQuery());
-       
+
         ResultSet results = null;
         try {
-                PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
-             preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
-             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
+            if(queryBank.get(queryObject.getQuery()) != null )
+                preparedEventualGet=queryBank.get(queryObject.getQuery());
+            else {
+                preparedEventualGet = session.prepare(queryObject.getQuery());
+                CachingUtil.updateStatementBank(queryObject.getQuery(), preparedEventualGet);
+            }
+            if(queryObject.getConsistency() == null) {
+                preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
+            }
+           else {
+               preparedEventualGet.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
+           }
+           results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
 
         } catch (Exception ex) {
             logger.error("Exception", ex);
-               logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
-               throw new MusicServiceException(ex.getMessage());
+            logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
+            throw new MusicServiceException(ex.getMessage());
         }
         return results;
     }
 
-    /**
-     * 
+    *//**
+     *
      * This method performs DDL operation on Cassandra using consistency level QUORUM.
-     * 
+     *
      * @param queryObject Object containing cassandra prepared query and values.
      * @return ResultSet
      * @throws MusicServiceException
      * @throws MusicQueryException
-     */
+     *//*
     public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
                     throws MusicServiceException, MusicQueryException {
         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
-               logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
+            logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
             throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
                             + queryObject.getQuery() + "]");
         }
@@ -486,11 +535,65 @@ public class MusicDataStore {
             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
         } catch (Exception ex) {
             logger.error("Exception", ex);
-               logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
-               throw new MusicServiceException(ex.getMessage());
+            logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
+            throw new MusicServiceException(ex.getMessage());
+        }
+        return results;
+
+    }
+    */
+    public ResultSet executeGet(PreparedQueryObject queryObject,String consistencyLevel) throws MusicQueryException, MusicServiceException {
+        if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
+            logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
+            throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
+                            + queryObject.getQuery() + "]");
+        }
+        ResultSet results = null;
+        try {
+            SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
+
+            if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) {
+                if(queryObject.getConsistency() == null) {
+                    statement.setConsistencyLevel(ConsistencyLevel.ONE);
+                }
+               else {
+                   statement.setConsistencyLevel(MusicUtil.getConsistencyLevel(queryObject.getConsistency()));
+               }
+            }
+            else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
+                statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
+            }
+
+            results = session.execute(statement);
+
+        } catch (Exception ex) {
+            logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
+            throw new MusicServiceException(ex.getMessage());
         }
+        
         return results;
+        
+    }
+    
+    /**
+     * This method performs DDL operations on Cassandra using consistency level ONE.
+     * 
+     * @param queryObject Object containing cassandra prepared query and values.
+     */
+    public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
+                    throws MusicServiceException, MusicQueryException {
+        return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
+    }
 
+    /**
+     * 
+     * This method performs DDL operation on Cassandra using consistency level QUORUM.
+     * 
+     * @param queryObject Object containing cassandra prepared query and values.
+     */
+    public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
+                    throws MusicServiceException, MusicQueryException {
+        return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);
     }
 
 }