Implement sequential lock references 93/72093/2
authorMohammad Salehe <salehe@cs.toronto.edu>
Wed, 7 Nov 2018 16:09:50 +0000 (11:09 -0500)
committerMohammad Salehe <salehe@cs.toronto.edu>
Wed, 21 Nov 2018 03:42:37 +0000 (22:42 -0500)
Use guard column for sequential lock references

Move v2sTimeStampInMicroseconds as a static method to MusicUtil

Use v2sTimeStamp in CassaDataStore.executePut

Change-Id: I48b817c4bfe04ec50f5ad6e7cdc91b34fd607feb
Issue-ID: MUSIC-148
Signed-off-by: Mohammad Salehe <salehe@cs.toronto.edu>
src/main/java/org/onap/music/datastore/CassaDataStore.java
src/main/java/org/onap/music/datastore/CassaLockStore.java
src/main/java/org/onap/music/main/MusicCore.java
src/main/java/org/onap/music/main/MusicUtil.java

index 14934f6..ec0b258 100644 (file)
@@ -32,6 +32,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
 
+import com.datastax.driver.core.*;
 import org.onap.music.eelf.logging.EELFLoggerDelegate;
 import org.onap.music.eelf.logging.format.AppMessages;
 import org.onap.music.eelf.logging.format.ErrorSeverity;
@@ -39,18 +40,7 @@ import org.onap.music.eelf.logging.format.ErrorTypes;
 import org.onap.music.exceptions.MusicQueryException;
 import org.onap.music.exceptions.MusicServiceException;
 import org.onap.music.main.MusicUtil;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ColumnDefinitions;
 import com.datastax.driver.core.ColumnDefinitions.Definition;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.DataType;
-import com.datastax.driver.core.KeyspaceMetadata;
-import com.datastax.driver.core.Metadata;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.TableMetadata;
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
@@ -373,7 +363,6 @@ public class CassaDataStore {
         try {
                
                                preparedInsert = session.prepare(queryObject.getQuery());
-                       
         } catch(InvalidQueryException iqe) {
                logger.error(EELFLoggerDelegate.errorLogger, iqe.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
                throw new MusicQueryException(iqe.getMessage());
@@ -391,9 +380,11 @@ public class CassaDataStore {
                 preparedInsert.setConsistencyLevel(ConsistencyLevel.ONE);
             }
 
-            ResultSet rs = session.execute(preparedInsert.bind(queryObject.getValues().toArray()));
-            result = rs.wasApplied();
+            BoundStatement boundStatement = preparedInsert.bind(queryObject.getValues().toArray());
+            boundStatement.setDefaultTimestamp(MusicUtil.v2sTimeStampInMicroseconds(0, System.currentTimeMillis()));
 
+            ResultSet rs = session.execute(boundStatement);
+            result = rs.wasApplied();
         }
         catch (AlreadyExistsException ae) {
             logger.error(EELFLoggerDelegate.errorLogger, ae.getMessage(),AppMessages.SESSIONFAILED+ " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
index e03a1c0..67e9653 100644 (file)
@@ -1,5 +1,6 @@
 package org.onap.music.datastore;
 
+import java.util.List;
 import java.util.UUID;
 
 import org.onap.music.eelf.logging.EELFLoggerDelegate;
@@ -77,31 +78,41 @@ public class CassaLockStore {
        public String genLockRefandEnQueue(String keyspace, String table, String lockName) throws MusicServiceException, MusicQueryException {
         logger.info(EELFLoggerDelegate.applicationLogger,
                 "Create lock reference for " +  keyspace + "." + table + "." + lockName);
-               table = "lockQ_" + table;
-               long lockEpochMillis = System.currentTimeMillis();
-               long lockRef = lockEpochMillis;
+        table = "lockQ_" + table;
 
-               logger.info(EELFLoggerDelegate.applicationLogger,
-                               "Created lock reference for " +  keyspace + "." + table + "." + lockName + ":" + lockRef);
 
-        PreparedQueryObject queryObject = new PreparedQueryObject();
-        String defaultQuery = " UPDATE " + keyspace + "." + table + " SET guard=-1 WHERE key=? IF guard = NULL;";
+               PreparedQueryObject queryObject = new PreparedQueryObject();
+               String selectQuery = "SELECT guard FROM " + keyspace + "." + table + " WHERE key=?;";
 
-        queryObject.addValue(lockName);
-        queryObject.appendQueryString(defaultQuery);
-        boolean dqResult = dsHandle.executePut(queryObject, "critical");
-//        System.out.println("dqResult: " + dqResult);
+               queryObject.addValue(lockName);
+               queryObject.appendQueryString(selectQuery);
+               ResultSet gqResult = dsHandle.executeEventualGet(queryObject);
+               List<Row> latestGuardRow = gqResult.all();
+
+               long prevGuard = 0;
+               long lockRef = 1;
+        if (latestGuardRow.size() > 0) {
+               prevGuard = latestGuardRow.get(0).getLong(0);
+               lockRef = prevGuard + 1;
+               }
 
+        long lockEpochMillis = System.currentTimeMillis();
+
+//        System.out.println("guard(" + lockName + "): " + prevGuard + "->" + lockRef);
+               logger.info(EELFLoggerDelegate.applicationLogger,
+                               "Created lock reference for " +  keyspace + "." + table + "." + lockName + ":" + lockRef);
 
         queryObject = new PreparedQueryObject();
                String insQuery = "BEGIN BATCH" +
-                " UPDATE " + keyspace + "." + table + " SET guard=? WHERE key=? IF guard < ?;" +
+                " UPDATE " + keyspace + "." + table +
+                               " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
                 " INSERT INTO " + keyspace + "." + table +
                                "(key, lockReference, createTime, acquireTime) VALUES (?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
 
         queryObject.addValue(lockRef);
         queryObject.addValue(lockName);
-        queryObject.addValue(lockRef);
+        if (prevGuard != 0)
+               queryObject.addValue(prevGuard);
 
         queryObject.addValue(lockName);
         queryObject.addValue(lockRef);
@@ -109,7 +120,6 @@ public class CassaLockStore {
         queryObject.addValue("0");
         queryObject.appendQueryString(insQuery);
         boolean pResult = dsHandle.executePut(queryObject, "critical");
-//        System.out.println("pResult: " + pResult);
                return String.valueOf(lockRef);
        }
        
index d7c5bce..cf2a47e 100644 (file)
@@ -26,7 +26,6 @@ import java.io.StringWriter;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.StringTokenizer;
-import java.util.UUID;
 
 import org.onap.music.datastore.CassaDataStore;
 import org.onap.music.datastore.CassaLockStore;
@@ -520,9 +519,14 @@ public class MusicCore {
             }
                 
           String query = queryObject.getQuery();
-          query = query.replaceFirst("SET", "using TIMESTAMP "+ v2sTimeStampInMicroseconds(lockReference, System.currentTimeMillis())+ " SET");
+            long timeOfWrite = System.currentTimeMillis();
+            long lockOrdinal = Long.parseLong(lockReference);
+            long ts = MusicUtil.v2sTimeStampInMicroseconds(lockOrdinal, timeOfWrite);
+            // TODO: use Statement instead of modifying query
+            query = query.replaceFirst("SET", "USING TIMESTAMP "+ ts + " SET");
          queryObject.replaceQueryString(query);
-         getDSHandle().executePut(queryObject, MusicUtil.CRITICAL);
+            CassaDataStore dsHandle = getDSHandle();
+            dsHandle.executePut(queryObject, MusicUtil.CRITICAL);
           long end = System.currentTimeMillis();
           logger.info(EELFLoggerDelegate.applicationLogger,"Time taken for the critical put:" + (end - start) + " ms");
         }catch (MusicQueryException | MusicServiceException | MusicLockingException  e) {
@@ -617,29 +621,32 @@ public class MusicCore {
     public static ReturnType atomicPut(String keyspaceName, String tableName, String primaryKey,
                     PreparedQueryObject queryObject, Condition conditionInfo) throws MusicLockingException, MusicQueryException, MusicServiceException {
         long start = System.currentTimeMillis();
+
         String fullyQualifiedKey = keyspaceName + "." + tableName + "." + primaryKey;
         String lockReference = createLockReference(fullyQualifiedKey);
         long lockCreationTime = System.currentTimeMillis();
+
         ReturnType lockAcqResult = acquireLock(fullyQualifiedKey, lockReference);
         long lockAcqTime = System.currentTimeMillis();
-        if (lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
-            logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockReference);
-            ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
-                            queryObject, lockReference, conditionInfo);
-            long criticalPutTime = System.currentTimeMillis();
-            voluntaryReleaseLock(fullyQualifiedKey,lockReference);
-            long lockDeleteTime = System.currentTimeMillis();
-            String timingInfo = "|lock creation time:" + (lockCreationTime - start)
-                            + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
-                            + "|critical put time:" + (criticalPutTime - lockAcqTime)
-                            + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
-            criticalPutResult.setTimingInfo(timingInfo);
-            return criticalPutResult;
-        } else {
+
+        if (!lockAcqResult.getResult().equals(ResultType.SUCCESS)) {
             logger.info(EELFLoggerDelegate.applicationLogger,"unable to acquire lock, id " + lockReference);
             voluntaryReleaseLock(fullyQualifiedKey,lockReference);
             return lockAcqResult;
         }
+
+        logger.info(EELFLoggerDelegate.applicationLogger,"acquired lock with id " + lockReference);
+        ReturnType criticalPutResult = criticalPut(keyspaceName, tableName, primaryKey,
+                        queryObject, lockReference, conditionInfo);
+        long criticalPutTime = System.currentTimeMillis();
+        voluntaryReleaseLock(fullyQualifiedKey,lockReference);
+        long lockDeleteTime = System.currentTimeMillis();
+        String timingInfo = "|lock creation time:" + (lockCreationTime - start)
+                        + "|lock accquire time:" + (lockAcqTime - lockCreationTime)
+                        + "|critical put time:" + (criticalPutTime - lockAcqTime)
+                        + "|lock delete time:" + (lockDeleteTime - criticalPutTime) + "|";
+        criticalPutResult.setTimingInfo(timingInfo);
+        return criticalPutResult;
     }
     
 
@@ -789,38 +796,11 @@ public class MusicCore {
         resultMap.put("keyspace",keyspace);
         return resultMap;
     }
-    
-    
-       /**
-        * Given the time of write for an update in a critical section, this method provides a transformed timestamp
-        * that ensures that a previous lock holder who is still alive can never corrupt a later critical section.
-        * The main idea is to us the lock reference to clearly demarcate the timestamps across critical sections. 
-        * @param the UUID lock reference associated with the write. 
-        * @param the long timeOfWrite which is the actual time at which the write took place 
-        * @throws MusicServiceException
-        * @throws MusicQueryException
-        */     
-       private static long v2sTimeStampInMicroseconds(String lockReference, long timeOfWrite) throws MusicServiceException, MusicQueryException{
-        long lockEpochMillis = Long.parseLong(lockReference);
-
-        long lockEternityMillis = lockEpochMillis - MusicUtil.MusicEternityEpochMillis;
-
-        long ts = lockEternityMillis * MusicUtil.MaxCriticalSectionDurationMillis
-                + (timeOfWrite - lockEpochMillis);
-
-        return ts;
-
-//             long test = (lockReferenceUUID.timestamp()-MusicUtil.MusicEternityEpochMillis);
-//             long timeStamp = (lockReferenceUUID.timestamp()-MusicUtil.MusicEternityEpochMillis)*MusicUtil.MaxCriticalSectionDurationMillis
-//                             +timeOfWrite;
-//             return timeStamp;
-
-//             return timeOfWrite*1000;
-       }
 
-       public static void main(String[] args) {
-               String x = "axe top";
-               x = x.replaceFirst("top", "sword");
-               System.out.print(x); //returns sword pickaxe
-       }
+
+//    public static void main(String[] args) {
+//             String x = "axe top";
+//             x = x.replaceFirst("top", "sword");
+//             System.out.print(x); //returns sword pickaxe
+//     }
 }
index 1cfd5fb..a12a090 100755 (executable)
@@ -46,6 +46,8 @@ import org.onap.music.eelf.logging.EELFLoggerDelegate;
 
 import com.datastax.driver.core.DataType;
 import com.sun.jersey.core.util.Base64;
+import org.onap.music.exceptions.MusicQueryException;
+import org.onap.music.exceptions.MusicServiceException;
 
 /**
  * @author nelson24
@@ -99,6 +101,8 @@ public class MusicUtil {
     public static ConcurrentMap<String, Long> zkNodeMap = new ConcurrentHashMap<>();
     
     public static final long MusicEternityEpochMillis = 1533081600000L; // Wednesday, August 1, 2018 12:00:00 AM
+
+    public static final long MaxLockReferenceTimePart = 1000000000000L; // millis after eternity (eq sometime in 2050)
     
     public static final long MaxCriticalSectionDurationMillis = 1L * 24 * 60 * 60 * 1000; // 1 day
 
@@ -596,4 +600,19 @@ public class MusicUtil {
                    MusicUtil.setCassPwd(prop.getProperty("cassandra.password"));
        }
 
+    /**
+     * Given the time of write for an update in a critical section, this method provides a transformed timestamp
+     * that ensures that a previous lock holder who is still alive can never corrupt a later critical section.
+     * The main idea is to us the lock reference to clearly demarcate the timestamps across critical sections.
+     * @param the UUID lock reference associated with the write.
+     * @param the long timeOfWrite which is the actual time at which the write took place
+     * @throws MusicServiceException
+     * @throws MusicQueryException
+     */
+    public static long v2sTimeStampInMicroseconds(long ordinal, long timeOfWrite) throws MusicServiceException, MusicQueryException {
+        // TODO: use acquire time instead of music eternity epoch
+        long ts = ordinal * MaxLockReferenceTimePart + (timeOfWrite - MusicEternityEpochMillis);
+
+        return ts;
+    }
 }