Improve timestamp and query handling 22/72122/2
authorMohammad Salehe <salehe@cs.toronto.edu>
Wed, 7 Nov 2018 20:56:21 +0000 (15:56 -0500)
committerMohammad Salehe <salehe@cs.toronto.edu>
Wed, 21 Nov 2018 03:42:37 +0000 (22:42 -0500)
Add timeSlot parameter to CassaDataStore.executePut
to prevent inconsistent timestamps

Rename CassaDataStore.executeEventualGet and
CassaDataStore.executeCriticalPut to reflect
their real functionality

Use simple bound statement instead of
prepared queries to improve performance

Change-Id: I439c5279f1c8e645740a9650ab8807c5ffa1725a
Issue-ID: MUSIC-148
Signed-off-by: Mohammad Salehe <salehe@cs.toronto.edu>
src/main/java/org/onap/music/conductor/conditionals/MusicConditional.java
src/main/java/org/onap/music/datastore/CassaDataStore.java
src/main/java/org/onap/music/datastore/CassaLockStore.java
src/main/java/org/onap/music/main/MusicCore.java
src/test/java/org/onap/music/unittests/MusicDataStoreTest.java
src/test/java/org/onap/music/unittests/MusicLockStoreTest.java
src/test/java/org/onap/music/unittests/TestMusicCore.java

index 8aadcba..2bed5fe 100644 (file)
@@ -122,7 +122,7 @@ public class MusicConditional {
                ReturnType lockAcqResult = MusicCore.acquireLock(fullyQualifiedKey, lockId);
                if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
                                try {
-                                       results = MusicCore.getDSHandle().executeCriticalGet(queryBank.get(MusicUtil.SELECT));
+                                       results = MusicCore.getDSHandle().executeQuorumConsistencyGet(queryBank.get(MusicUtil.SELECT));
                                } catch (Exception e) {
                                        return new ReturnType(ResultType.FAILURE, e.getMessage());
                                }
@@ -178,7 +178,7 @@ public class MusicConditional {
                try {
                        ReturnType lockAcqResult = MusicCore.acquireLockWithLease(key, lockId, leasePeriod);
                        if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
-                               Row row  = MusicCore.getDSHandle().executeCriticalGet(queryBank.get(MusicUtil.SELECT)).one();
+                               Row row  = MusicCore.getDSHandle().executeQuorumConsistencyGet(queryBank.get(MusicUtil.SELECT)).one();
                                
                                if(row != null) {
                                        Map<String, String> updatedValues = cascadeColumnUpdateSpecific(row, cascadeColumnValues, casscadeColumnName, planId);
index ec0b258..a56cf63 100644 (file)
@@ -30,7 +30,6 @@ import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.UUID;
 
 import com.datastax.driver.core.*;
 import org.onap.music.eelf.logging.EELFLoggerDelegate;
@@ -44,8 +43,6 @@ import com.datastax.driver.core.ColumnDefinitions.Definition;
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import com.datastax.driver.core.utils.UUIDs;
-import com.sun.jersey.core.util.Base64;
 
 /**
  * @author nelson24
@@ -81,11 +78,12 @@ import com.sun.jersey.core.util.Base64;
  */
 public class CassaDataStore {
 
+    public static final String CONSISTENCY_LEVEL_ONE = "ONE";
+    public static final String CONSISTENCY_LEVEL_QUORUM = "QUORUM";
+
     private Session session;
     private Cluster cluster;
 
-
-
     /**
      * @param session
      */
@@ -94,7 +92,7 @@ public class CassaDataStore {
     }
     
     /**
-     * @param session
+     * @param
      */
     public Session getSession() {
         return session;
@@ -333,22 +331,38 @@ public class CassaDataStore {
         return resultMap;
     }
 
+    /**
+     * This Method performs DDL and DML operations on Cassandra using specified consistency level outside any time-slot
+     *
+     * @param queryObject Object containing cassandra prepared query and values.
+     * @param consistency Specify consistency level for data synchronization across cassandra
+     *        replicas
+     * @return Boolean Indicates operation success or failure
+     * @throws MusicServiceException
+     * @throws MusicQueryException
+     */
+    public boolean executePut(PreparedQueryObject queryObject, String consistency)
+            throws MusicServiceException, MusicQueryException {
+        return executePut(queryObject, consistency, 0);
+    }
 
     // Prepared Statements 1802 additions
     /**
      * This Method performs DDL and DML operations on Cassandra using specified consistency level
      * 
      * @param queryObject Object containing cassandra prepared query and values.
-     * @param consistency Specify consistency level for data synchronization across cassandra
+     * @param consistencyLevel Specify consistency level for data synchronization across cassandra
      *        replicas
+     * @param timeSlot Specify timestamp time-slot
      * @return Boolean Indicates operation success or failure
      * @throws MusicServiceException
      * @throws MusicQueryException
      */
-    public boolean executePut(PreparedQueryObject queryObject, String consistency)
-                    throws MusicServiceException, MusicQueryException {
+    public boolean executePut(PreparedQueryObject queryObject, String consistencyLevel, long timeSlot)
+            throws MusicServiceException, MusicQueryException {
 
         boolean result = false;
+        long timeOfWrite = System.currentTimeMillis();
 
         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
                logger.error(EELFLoggerDelegate.errorLogger, queryObject.getQuery(),AppMessages.QUERYERROR, ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
@@ -359,10 +373,10 @@ public class CassaDataStore {
                         "In preprared Execute Put: the actual insert query:"
                                         + queryObject.getQuery() + "; the values"
                                         + queryObject.getValues());
-        PreparedStatement preparedInsert = null;
+        SimpleStatement statement;
         try {
-               
-                               preparedInsert = session.prepare(queryObject.getQuery());
+
+             statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
         } catch(InvalidQueryException iqe) {
                logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
                throw new MusicQueryException(iqe.getMessage());
@@ -372,18 +386,18 @@ public class CassaDataStore {
         }
         
         try {
-            if (consistency.equalsIgnoreCase(MusicUtil.CRITICAL)) {
+            if (consistencyLevel.equalsIgnoreCase(MusicUtil.CRITICAL)) {
                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing critical put query");
-                preparedInsert.setConsistencyLevel(ConsistencyLevel.QUORUM);
-            } else if (consistency.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
+                statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
+            } else if (consistencyLevel.equalsIgnoreCase(MusicUtil.EVENTUAL)) {
                 logger.info(EELFLoggerDelegate.applicationLogger, "Executing simple put query");
-                preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
+                statement.setConsistencyLevel(ConsistencyLevel.ONE);
             }
 
-            BoundStatement boundStatement = preparedInsert.bind(queryObject.getValues().toArray());
-            boundStatement.setDefaultTimestamp(MusicUtil.v2sTimeStampInMicroseconds(0, System.currentTimeMillis()));
+            long timestamp = MusicUtil.v2sTimeStampInMicroseconds(timeSlot, timeOfWrite);
+            statement.setDefaultTimestamp(timestamp);
 
-            ResultSet rs = session.execute(boundStatement);
+            ResultSet rs = session.execute(statement);
             result = rs.wasApplied();
         }
         catch (AlreadyExistsException ae) {
@@ -401,66 +415,60 @@ public class CassaDataStore {
     }
 
     /**
-     * This method performs DDL operations on Cassandra using consistency level ONE.
-     * 
+     * This method performs DDL operations on Cassandra using consistency specified consistency.
+     *
      * @param queryObject Object containing cassandra prepared query and values.
-     * @return ResultSet
-     * @throws MusicServiceException
-     * @throws MusicQueryException
      */
-    public ResultSet executeEventualGet(PreparedQueryObject queryObject)
-                    throws MusicServiceException, MusicQueryException {
+    public ResultSet executeGet(PreparedQueryObject queryObject, String consistencyLevel)
+            throws MusicServiceException, MusicQueryException {
 
         if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
-               logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
-               throw new MusicQueryException("Ill formed queryObject for the request = " + "["
-                            + queryObject.getQuery() + "]");
+            logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
+            throw new MusicQueryException("Ill formed queryObject for the request = " + "["
+                    + queryObject.getQuery() + "]");
         }
         logger.info(EELFLoggerDelegate.applicationLogger,
-                        "Executing Eventual  get query:" + queryObject.getQuery());
-       
+                "Executing Eventual get query:" + queryObject.getQuery());
+
         ResultSet results = null;
         try {
-                PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
-             preparedEventualGet.setConsistencyLevel(ConsistencyLevel.ONE);
-             results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
+            SimpleStatement statement = new SimpleStatement(queryObject.getQuery(), queryObject.getValues().toArray());
+
+            if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_ONE)) {
+                statement.setConsistencyLevel(ConsistencyLevel.ONE);
+            }
+            else if (consistencyLevel.equalsIgnoreCase(CONSISTENCY_LEVEL_QUORUM)) {
+                statement.setConsistencyLevel(ConsistencyLevel.QUORUM);
+            }
+
+            results = session.execute(statement);
 
         } catch (Exception ex) {
-               logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
-               throw new MusicServiceException(ex.getMessage());
+            logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
+            throw new MusicServiceException(ex.getMessage());
         }
         return results;
     }
 
+    /**
+     * This method performs DDL operations on Cassandra using consistency level ONE.
+     * 
+     * @param queryObject Object containing cassandra prepared query and values.
+     */
+    public ResultSet executeOneConsistencyGet(PreparedQueryObject queryObject)
+                    throws MusicServiceException, MusicQueryException {
+        return executeGet(queryObject, CONSISTENCY_LEVEL_ONE);
+    }
+
     /**
      * 
      * This method performs DDL operation on Cassandra using consistency level QUORUM.
      * 
      * @param queryObject Object containing cassandra prepared query and values.
-     * @return ResultSet
-     * @throws MusicServiceException
-     * @throws MusicQueryException
      */
-    public ResultSet executeCriticalGet(PreparedQueryObject queryObject)
+    public ResultSet executeQuorumConsistencyGet(PreparedQueryObject queryObject)
                     throws MusicServiceException, MusicQueryException {
-        if (!MusicUtil.isValidQueryObject(!queryObject.getValues().isEmpty(), queryObject)) {
-               logger.error(EELFLoggerDelegate.errorLogger, "",AppMessages.QUERYERROR+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
-            throw new MusicQueryException("Error processing Prepared Query Object for the request = " + "["
-                            + queryObject.getQuery() + "]");
-        }
-        logger.info(EELFLoggerDelegate.applicationLogger,
-                        "Executing Critical get query:" + queryObject.getQuery());
-        PreparedStatement preparedEventualGet = session.prepare(queryObject.getQuery());
-        preparedEventualGet.setConsistencyLevel(ConsistencyLevel.QUORUM);
-        ResultSet results = null;
-        try {
-            results = session.execute(preparedEventualGet.bind(queryObject.getValues().toArray()));
-        } catch (Exception ex) {
-               logger.error(EELFLoggerDelegate.errorLogger, ex.getMessage(),AppMessages.UNKNOWNERROR+ "[" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
-               throw new MusicServiceException(ex.getMessage());
-        }
-        return results;
-
+        return executeGet(queryObject, CONSISTENCY_LEVEL_QUORUM);
     }
 }
 
index 67e9653..c1bf478 100644 (file)
@@ -1,16 +1,13 @@
 package org.onap.music.datastore;
 
 import java.util.List;
-import java.util.UUID;
 
 import org.onap.music.eelf.logging.EELFLoggerDelegate;
 import org.onap.music.exceptions.MusicQueryException;
 import org.onap.music.exceptions.MusicServiceException;
-import org.onap.music.main.MusicUtil;
 
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
-import com.datastax.driver.core.utils.UUIDs;
 
 /*
  * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state. 
@@ -57,8 +54,7 @@ public class CassaLockStore {
                String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
                                + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, PRIMARY KEY ((key), lockReference) ) "
                                + "WITH CLUSTERING ORDER BY (lockReference ASC);";
-               System.out.println(tabQuery);
-               PreparedQueryObject queryObject = new PreparedQueryObject(); 
+               PreparedQueryObject queryObject = new PreparedQueryObject();
                
                queryObject.appendQueryString(tabQuery);
                boolean result;
@@ -86,7 +82,7 @@ public class CassaLockStore {
 
                queryObject.addValue(lockName);
                queryObject.appendQueryString(selectQuery);
-               ResultSet gqResult = dsHandle.executeEventualGet(queryObject);
+               ResultSet gqResult = dsHandle.executeOneConsistencyGet(queryObject);
                List<Row> latestGuardRow = gqResult.all();
 
                long prevGuard = 0;
@@ -140,7 +136,7 @@ public class CassaLockStore {
                String selectQuery = "select * from "+keyspace+"."+table+" where key='"+key+"' LIMIT 1;";       
         PreparedQueryObject queryObject = new PreparedQueryObject();
         queryObject.appendQueryString(selectQuery);
-               ResultSet results = dsHandle.executeEventualGet(queryObject);
+               ResultSet results = dsHandle.executeOneConsistencyGet(queryObject);
                Row row = results.one();
                String lockReference = "" + row.getLong("lockReference");
                String createTime = row.getString("createTime");
index cf2a47e..f085be0 100644 (file)
@@ -230,7 +230,7 @@ public class MusicCore {
                String query = "select * from "+syncTable+" where key='"+fullyQualifiedKey+"';";
         PreparedQueryObject readQueryObject = new PreparedQueryObject();
         readQueryObject.appendQueryString(query);
-               ResultSet results = getDSHandle().executeCriticalGet(readQueryObject);                  
+               ResultSet results = getDSHandle().executeQuorumConsistencyGet(readQueryObject);
                if (results.all().size() != 0) {
                        logger.info("In acquire lock: Since there was a forcible release, need to sync quorum!");
                        try {
@@ -315,7 +315,7 @@ public class MusicCore {
         selectQuery.addValue(cqlFormattedPrimaryKeyValue);
         ResultSet results = null;
         try {
-            results = getDSHandle().executeCriticalGet(selectQuery);
+            results = getDSHandle().executeQuorumConsistencyGet(selectQuery);
             // write it back to a quorum
             Row row = results.one();
             ColumnDefinitions colInfo = row.getColumnDefinitions();
@@ -356,7 +356,7 @@ public class MusicCore {
     public static ResultSet quorumGet(PreparedQueryObject query) {
         ResultSet results = null;
         try {
-            results = getDSHandle().executeCriticalGet(query);
+            results = getDSHandle().executeQuorumConsistencyGet(query);
         } catch (MusicServiceException | MusicQueryException e) {
                logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR ,ErrorSeverity.MAJOR, ErrorTypes.GENERALSERVICEERROR);
         
@@ -518,15 +518,9 @@ public class MusicCore {
                                       + e.getMessage());
             }
                 
-          String query = queryObject.getQuery();
-            long timeOfWrite = System.currentTimeMillis();
             long lockOrdinal = Long.parseLong(lockReference);
-            long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
-            // TODO: use Statement instead of modifying query
-            query = query.replaceFirst("SET", "USING TIMESTAMP "+ ts + " SET");
-         queryObject.replaceQueryString(query);
             CassaDataStore dsHandle = getDSHandle();
-            dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
+            dsHandle.executePut(queryObject, MusicUtil.CRITICAL, lockOrdinal);
           long end = System.currentTimeMillis();
           logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
         }catch (MusicQueryException | MusicServiceException | MusicLockingException  e) {
@@ -572,7 +566,7 @@ public class MusicCore {
     public static ResultSet get(PreparedQueryObject queryObject) throws MusicServiceException {
         ResultSet results = null;
         try {
-                       results = getDSHandle().executeEventualGet(queryObject);
+                       results = getDSHandle().executeOneConsistencyGet(queryObject);
         } catch (MusicQueryException | MusicServiceException e) {
             logger.error(EELFLoggerDelegate.errorLogger,e.getMessage());
             throw new MusicServiceException(e.getMessage());
@@ -599,7 +593,7 @@ public class MusicCore {
             ReturnType result = isTopOfLockStore(keyspace, table, primaryKeyValue, lockReference);
             if(result.getResult().equals(ResultType.FAILURE))
                        return null;//not top of the lock store q
-                results = getDSHandle().executeCriticalGet(queryObject);
+                results = getDSHandle().executeQuorumConsistencyGet(queryObject);
         } catch (MusicQueryException | MusicServiceException | MusicLockingException e) {
                        logger.error(EELFLoggerDelegate.errorLogger,e.getMessage(), AppMessages.UNKNOWNERROR  ,ErrorSeverity.WARN, ErrorTypes.MUSICSERVICEERROR);
         }
index 3f7fd3b..b2c6df8 100644 (file)
@@ -29,7 +29,6 @@ import org.junit.BeforeClass;
 import org.junit.FixMethodOrder;
 import org.junit.Test;
 import org.junit.runners.MethodSorters;
-import org.mockito.Mock;
 import org.onap.music.exceptions.MusicQueryException;
 import org.onap.music.exceptions.MusicServiceException;
 
@@ -105,7 +104,7 @@ public class MusicDataStoreTest {
         boolean result = false;
         int count = 0;
         ResultSet output = null;
-        output = dataStore.executeEventualGet(testObject);
+        output = dataStore.executeOneConsistencyGet(testObject);
         System.out.println(output);
         ;
         for (Row row : output) {
@@ -124,7 +123,7 @@ public class MusicDataStoreTest {
         boolean result = false;
         int count = 0;
         ResultSet output = null;
-        output = dataStore.executeCriticalGet(testObject);
+        output = dataStore.executeQuorumConsistencyGet(testObject);
         System.out.println(output);
         ;
         for (Row row : output) {
index 8677453..a027fd9 100644 (file)
@@ -29,7 +29,6 @@ import org.junit.BeforeClass;
 import org.junit.FixMethodOrder;
 import org.junit.Test;
 import org.junit.runners.MethodSorters;
-import org.mockito.Mock;
 import org.onap.music.exceptions.MusicQueryException;
 import org.onap.music.exceptions.MusicServiceException;
 
@@ -113,7 +112,7 @@ public class MusicLockStoreTest {
         boolean result = false;
         int count = 0;
         ResultSet output = null;
-        output = dataStore.executeEventualGet(testObject);
+        output = dataStore.executeOneConsistencyGet(testObject);
         System.out.println(output);
         ;
         for (Row row : output) {
@@ -132,7 +131,7 @@ public class MusicLockStoreTest {
         boolean result = false;
         int count = 0;
         ResultSet output = null;
-        output = dataStore.executeCriticalGet(testObject);
+        output = dataStore.executeQuorumConsistencyGet(testObject);
         System.out.println(output);
         ;
         for (Row row : output) {
index 01d2ffb..f7b9c0d 100644 (file)
@@ -3,8 +3,6 @@ package org.onap.music.unittests;
 import static org.junit.Assert.assertEquals;
 
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
 import org.junit.AfterClass;
@@ -68,7 +66,7 @@ public class TestMusicCore {
         queryObject = new PreparedQueryObject();
         String systemQuery = "SELECT keyspace_name FROM system_schema.keyspaces where keyspace_name='"+keyspace.toLowerCase()+"';";
         queryObject.appendQueryString(systemQuery);
-        ResultSet rs = dataStore.executeEventualGet(queryObject);     
+        ResultSet rs = dataStore.executeOneConsistencyGet(queryObject);
         assert rs.all().size()> 0;
     }
     
@@ -84,7 +82,7 @@ public class TestMusicCore {
         queryObject = new PreparedQueryObject();
         String systemQuery = "SELECT table_name FROM system_schema.tables where keyspace_name='"+keyspace.toLowerCase()+"' and table_name='"+table.toLowerCase()+"';";
         queryObject.appendQueryString(systemQuery);
-        ResultSet rs = dataStore.executeEventualGet(queryObject);
+        ResultSet rs = dataStore.executeOneConsistencyGet(queryObject);
         assert rs.all().size()> 0;
     }