Read lock promotion 48/96248/3
authorTschaen, Brendan <ctschaen@att.com>
Wed, 25 Sep 2019 18:54:46 +0000 (14:54 -0400)
committerTschaen, Brendan <ctschaen@att.com>
Thu, 26 Sep 2019 17:52:01 +0000 (13:52 -0400)
Change-Id: Ib2515c728503fb729e6ecc2e09973bbfa9e2e317
Issue-ID: MUSIC-508
Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
pom.xml
src/main/java/org/onap/music/datastore/MusicDataStore.java
src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
src/main/java/org/onap/music/lockingservice/cassandra/LockType.java
src/main/java/org/onap/music/main/MusicCore.java
src/main/java/org/onap/music/rest/RestMusicLocksAPI.java
src/main/java/org/onap/music/service/MusicCoreService.java
src/main/java/org/onap/music/service/impl/MusicCassaCore.java
src/test/java/org/onap/music/unittests/CassandraCQL.java
src/test/java/org/onap/music/unittests/TstRestMusicLockAPI.java

diff --git a/pom.xml b/pom.xml
index 11b999a..8864c0b 100755 (executable)
--- a/pom.xml
+++ b/pom.xml
             <artifactId>cassandra-driver-core</artifactId>
             <version>${cassandra.version}</version>
         </dependency>
             <artifactId>cassandra-driver-core</artifactId>
             <version>${cassandra.version}</version>
         </dependency>
+        <dependency>
+          <groupId>com.datastax.cassandra</groupId>
+          <artifactId>cassandra-driver-extras</artifactId>
+          <version>${cassandra.version}</version>
+        </dependency>
         <!-- /Cassandra -->
         <!-- Jersey -->
         <dependency>
         <!-- /Cassandra -->
         <!-- Jersey -->
         <dependency>
index 6555ea2..5a65868 100755 (executable)
@@ -41,6 +41,7 @@ import org.onap.music.eelf.logging.format.ErrorSeverity;
 import org.onap.music.eelf.logging.format.ErrorTypes;
 import org.onap.music.exceptions.MusicQueryException;
 import org.onap.music.exceptions.MusicServiceException;
 import org.onap.music.eelf.logging.format.ErrorTypes;
 import org.onap.music.exceptions.MusicQueryException;
 import org.onap.music.exceptions.MusicServiceException;
+import org.onap.music.lockingservice.cassandra.LockType;
 import org.onap.music.main.CipherUtil;
 import org.onap.music.main.MusicUtil;
 import com.datastax.driver.core.Cluster;
 import org.onap.music.main.CipherUtil;
 import org.onap.music.main.MusicUtil;
 import com.datastax.driver.core.Cluster;
@@ -57,9 +58,12 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.SimpleStatement;
 import com.datastax.driver.core.TableMetadata;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.SimpleStatement;
 import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.TypeCodec;
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.core.exceptions.AlreadyExistsException;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
+import com.datastax.driver.extras.codecs.enums.EnumOrdinalCodec;
 
 /**
  * @author nelson24
 
 /**
  * @author nelson24
@@ -93,15 +97,23 @@ public class MusicDataStore {
     public void setCluster(Cluster cluster) {
         this.cluster = cluster;
     }
     public void setCluster(Cluster cluster) {
         this.cluster = cluster;
     }
+    
+    public Cluster getCluster() {
+        return this.cluster;
+    }
 
 
     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
 
     /**
 
 
     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicDataStore.class);
 
     /**
-     *
+     * Connect to default Cassandra address
      */
     public MusicDataStore() {
      */
     public MusicDataStore() {
-        connectToCassaCluster();
+        try {
+            connectToCassaCluster(MusicUtil.getMyCassaHost());
+        } catch (MusicServiceException e) {
+            logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), e);
+        }
     }
 
 
     }
 
 
@@ -127,82 +139,6 @@ public class MusicDataStore {
         }
     }
 
         }
     }
 
-    /**
-     *
-     * @return
-     */
-    private ArrayList<String> getAllPossibleLocalIps() {
-        ArrayList<String> allPossibleIps = new ArrayList<>();
-        try {
-            Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
-            while (en.hasMoreElements()) {
-                NetworkInterface ni = en.nextElement();
-                Enumeration<InetAddress> ee = ni.getInetAddresses();
-                while (ee.hasMoreElements()) {
-                    InetAddress ia = ee.nextElement();
-                    allPossibleIps.add(ia.getHostAddress());
-                }
-            }
-        } catch (SocketException e) {
-            logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.CONNCECTIVITYERROR,
-                ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e);
-        }catch(Exception e) {
-            logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), ErrorSeverity.ERROR, ErrorTypes
-                .GENERALSERVICEERROR, e);
-        }
-        return allPossibleIps;
-    }
-
-    /**
-     * This method iterates through all available IP addresses and connects to multiple cassandra
-     * clusters.
-     */
-    private void connectToCassaCluster() {
-        Iterator<String> it = getAllPossibleLocalIps().iterator();
-        String address = "localhost";
-        String[] addresses = null;
-        address = MusicUtil.getMyCassaHost();
-        addresses = address.split(",");
-
-        logger.info(EELFLoggerDelegate.applicationLogger,
-                        "Connecting to cassa cluster: Iterating through possible ips:"
-                                        + getAllPossibleLocalIps());
-        PoolingOptions poolingOptions = new PoolingOptions();
-        poolingOptions
-        .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
-        .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
-        while (it.hasNext()) {
-            try {
-                if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
-                    String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd());
-                    logger.info(EELFLoggerDelegate.applicationLogger,
-                            "Building with credentials "+MusicUtil.getCassName()+" & "+MusicUtil.getCassPwd());
-                    cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
-                                        .withCredentials(MusicUtil.getCassName(), cassPwd)
-                                        //.withLoadBalancingPolicy(new RoundRobinPolicy())
-                                        .withoutJMXReporting()
-                                        .withPoolingOptions(poolingOptions)
-                                        .addContactPoints(addresses).build();
-                }
-                else
-                    cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
-                                        //.withLoadBalancingPolicy(new RoundRobinPolicy())
-                                        .addContactPoints(addresses).build();
-
-                Metadata metadata = cluster.getMetadata();
-                logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
-                                + metadata.getClusterName() + " at " + address);
-                session = cluster.connect();
-
-                break;
-            } catch (NoHostAvailableException e) {
-                address = it.next();
-                logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.HOSTUNAVAILABLE,
-                    ErrorSeverity.ERROR, ErrorTypes.CONNECTIONERROR, e);
-            }
-        }
-    }
-
     /**
      *
      */
     /**
      *
      */
@@ -222,6 +158,7 @@ public class MusicDataStore {
         poolingOptions
         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
         poolingOptions
         .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
         .setConnectionsPerHost(HostDistance.REMOTE, 2, 4);
+        
         if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
             String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd());
             logger.info(EELFLoggerDelegate.applicationLogger,
         if(MusicUtil.getCassName() != null && MusicUtil.getCassPwd() != null) {
             String cassPwd = CipherUtil.decryptPKC(MusicUtil.getCassPwd());
             logger.info(EELFLoggerDelegate.applicationLogger,
@@ -236,12 +173,18 @@ public class MusicDataStore {
             cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
                         .withoutJMXReporting()
                         .withPoolingOptions(poolingOptions)
             cluster = Cluster.builder().withPort(MusicUtil.getCassandraPort())
                         .withoutJMXReporting()
                         .withPoolingOptions(poolingOptions)
-                        .addContactPoints(addresses).build();
+                        .addContactPoints(addresses)
+                        .build();
         }
         
         }
         
+        
         Metadata metadata = cluster.getMetadata();
         logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
                         + metadata.getClusterName() + " at " + address);
         Metadata metadata = cluster.getMetadata();
         logger.info(EELFLoggerDelegate.applicationLogger, "Connected to cassa cluster "
                         + metadata.getClusterName() + " at " + address);
+        
+        EnumNameCodec<LockType> lockTypeCodec = new EnumNameCodec<LockType>(LockType.class);
+        cluster.getConfiguration().getCodecRegistry().register(lockTypeCodec);
+
         try {
             session = cluster.connect();
         } catch (Exception ex) {
         try {
             session = cluster.connect();
         } catch (Exception ex) {
index 0ec8507..1089847 100644 (file)
@@ -36,9 +36,12 @@ import org.onap.music.exceptions.MusicServiceException;
 import org.onap.music.main.DeadlockDetectionUtil;
 import org.onap.music.main.DeadlockDetectionUtil.OwnershipType;
 import org.onap.music.main.MusicUtil;
 import org.onap.music.main.DeadlockDetectionUtil;
 import org.onap.music.main.DeadlockDetectionUtil.OwnershipType;
 import org.onap.music.main.MusicUtil;
-
+import org.onap.music.main.ResultType;
+import org.onap.music.main.ReturnType;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
 
 /*
  * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state. 
 
 /*
  * This is the lock store that is built on top of Cassandra that is used by MUSIC to maintain lock state. 
@@ -126,7 +129,7 @@ public class CassaLockStore {
         table = table_prepend_name+table;
         String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
                 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, "
         table = table_prepend_name+table;
         String tabQuery = "CREATE TABLE IF NOT EXISTS "+keyspace+"."+table
                 + " ( key text, lockReference bigint, createTime text, acquireTime text, guard bigint static, "
-                + "writeLock boolean, owner text, PRIMARY KEY ((key), lockReference) ) "
+                + "lockType text, owner text, PRIMARY KEY ((key), lockReference) ) "
                 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
         PreparedQueryObject queryObject = new PreparedQueryObject();
         
                 + "WITH CLUSTERING ORDER BY (lockReference ASC);";
         PreparedQueryObject queryObject = new PreparedQueryObject();
         
@@ -176,13 +179,14 @@ public class CassaLockStore {
 
         logger.info(EELFLoggerDelegate.applicationLogger,
                 "Created lock reference for " +  keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
 
         logger.info(EELFLoggerDelegate.applicationLogger,
                 "Created lock reference for " +  keyspace + "." + lockTable + "." + lockName + ":" + lockRef);
-
+        
         queryObject = new PreparedQueryObject();
         queryObject = new PreparedQueryObject();
+        
         String insQuery = "BEGIN BATCH" +
                 " UPDATE " + keyspace + "." + lockTable +
                 " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
                 " INSERT INTO " + keyspace + "." + lockTable +
         String insQuery = "BEGIN BATCH" +
                 " UPDATE " + keyspace + "." + lockTable +
                 " SET guard=? WHERE key=? IF guard = " + (prevGuard == 0 ? "NULL" : "?") +";" +
                 " INSERT INTO " + keyspace + "." + lockTable +
-                "(key, lockReference, createTime, acquireTime, writeLock, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
+                "(key, lockReference, createTime, acquireTime, lockType, owner) VALUES (?,?,?,?,?,?) IF NOT EXISTS; APPLY BATCH;";
 
         queryObject.addValue(lockRef);
         queryObject.addValue(lockName);
 
         queryObject.addValue(lockRef);
         queryObject.addValue(lockName);
@@ -193,7 +197,7 @@ public class CassaLockStore {
         queryObject.addValue(lockRef);
         queryObject.addValue(String.valueOf(lockEpochMillis));
         queryObject.addValue("0");
         queryObject.addValue(lockRef);
         queryObject.addValue(String.valueOf(lockEpochMillis));
         queryObject.addValue("0");
-        queryObject.addValue(locktype==LockType.WRITE ? true : false );
+        queryObject.addValue(locktype);
         queryObject.addValue(owner);
         queryObject.appendQueryString(insQuery);
         boolean pResult = dsHandle.executePut(queryObject, "critical");
         queryObject.addValue(owner);
         queryObject.appendQueryString(insQuery);
         boolean pResult = dsHandle.executePut(queryObject, "critical");
@@ -285,7 +289,7 @@ public class CassaLockStore {
         String lockReference = "" + row.getLong("lockReference");
         String createTime = row.getString("createTime");
         String acquireTime = row.getString("acquireTime");
         String lockReference = "" + row.getLong("lockReference");
         String createTime = row.getString("createTime");
         String acquireTime = row.getString("acquireTime");
-        LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
+        LockType locktype = row.get("lockType", LockType.class);
         String owner = row.getString("owner");
 
         return new LockObject(true, lockReference, createTime, acquireTime, locktype, owner);
         String owner = row.getString("owner");
 
         return new LockObject(true, lockReference, createTime, acquireTime, locktype, owner);
@@ -313,7 +317,7 @@ public class CassaLockStore {
                     return lockHolders;
                 }
                 lockReference.append(lock).append(row.getLong("lockReference"));
                     return lockHolders;
                 }
                 lockReference.append(lock).append(row.getLong("lockReference"));
-            if (row.isNull("writeLock") || row.getBool("writeLock")) {
+            if (row.get("lockType", LockType.class)!=LockType.WRITE) {
                 if (topOfQueue) {
                     lockHolders.add(lockReference.toString());
                     break;
                 if (topOfQueue) {
                     lockHolders.add(lockReference.toString());
                     break;
@@ -356,7 +360,7 @@ public class CassaLockStore {
         boolean topOfQueue = true;
         for (Row row : rs) {
             String lockReference = "" + row.getLong("lockReference");
         boolean topOfQueue = true;
         for (Row row : rs) {
             String lockReference = "" + row.getLong("lockReference");
-            if (row.isNull("writeLock") || row.getBool("writeLock")) {
+            if (row.get("lockType", LockType.class)==LockType.WRITE) {
                 if (topOfQueue && lockRef.equals(lockReference)) {
                     return true;
                 } else {
                 if (topOfQueue && lockRef.equals(lockReference)) {
                     return true;
                 } else {
@@ -404,7 +408,7 @@ public class CassaLockStore {
         String lockReference = "" + row.getLong("lockReference");
         String createTime = row.getString("createTime");
         String acquireTime = row.getString("acquireTime");
         String lockReference = "" + row.getLong("lockReference");
         String createTime = row.getString("createTime");
         String acquireTime = row.getString("acquireTime");
-        LockType locktype = row.isNull("writeLock") || row.getBool("writeLock") ? LockType.WRITE : LockType.READ;
+        LockType locktype = row.get("lockType", LockType.class);
         boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef);
         String owner = row.getString("owner");
 
         boolean isLockOwner = isLockOwner(keyspace, table, key, lockRef);
         String owner = row.getString("owner");
 
@@ -456,11 +460,9 @@ public class CassaLockStore {
 
     public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) {
         table = table_prepend_name + table;
 
     public void updateLockAcquireTime(String keyspace, String table, String key, String lockReference) {
         table = table_prepend_name + table;
-        PreparedQueryObject queryObject = new PreparedQueryObject();
         Long lockReferenceL = Long.parseLong(lockReference);
         String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis()
                 + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;";
         Long lockReferenceL = Long.parseLong(lockReference);
         String updateQuery = "update " + keyspace + "." + table + " set acquireTime='" + System.currentTimeMillis()
                 + "' where key='" + key + "' AND lockReference = " + lockReferenceL + " IF EXISTS;";
-        queryObject.appendQueryString(updateQuery);
 
         //cannot use executePut because we need to ignore music timestamp adjustments for lock store
         dsHandle.getSession().execute(updateQuery);
 
         //cannot use executePut because we need to ignore music timestamp adjustments for lock store
         dsHandle.getSession().execute(updateQuery);
@@ -473,7 +475,8 @@ public class CassaLockStore {
         String lockTable = table_prepend_name + table;
         PreparedQueryObject queryObject = new PreparedQueryObject();
         queryObject.appendQueryString("SELECT key, acquiretime, owner FROM " + keyspace + "." + lockTable);
         String lockTable = table_prepend_name + table;
         PreparedQueryObject queryObject = new PreparedQueryObject();
         queryObject.appendQueryString("SELECT key, acquiretime, owner FROM " + keyspace + "." + lockTable);
-        queryObject.appendQueryString(" WHERE writelock = True ALLOW FILTERING");
+        queryObject.appendQueryString(" WHERE lockType = ? ALLOW FILTERING");
+        queryObject.addValue(LockType.WRITE);
 
         DeadlockDetectionUtil ddu = new DeadlockDetectionUtil();
 
 
         DeadlockDetectionUtil ddu = new DeadlockDetectionUtil();
 
@@ -506,5 +509,96 @@ public class CassaLockStore {
         return toRet;
     }
 
         return toRet;
     }
 
+    public ReturnType promoteLock(String keyspace, String table, String key, String lockRef)
+            throws MusicLockingException, MusicServiceException, MusicQueryException {
+        String lockqtable = table_prepend_name + table;
+        String selectQuery = "select * from " + keyspace + "." + lockqtable + " where key=?;";
+
+        PreparedQueryObject queryObject = new PreparedQueryObject();
+        queryObject.appendQueryString(selectQuery);
+        queryObject.addValue(key);
+        ResultSet rs = dsHandle.executeOneConsistencyGet(queryObject);
+        
+        long refToPromote = Long.parseLong(lockRef);
+
+        boolean topOfQueue = true;
+        boolean readBlock = false;
+        boolean seenLockToPromote = false;
+        boolean promotionOngoing = false;
+        long readBlockStart = 0;
+        long readBlockEnd = 0;
+
+
+        for (Row row : rs) {
+            long ref = row.getLong("lockreference");
+            LockType lockType = row.get("lockType", LockType.class);
+            
+            if (refToPromote==ref) {
+                if (promotionOngoing) {
+                    return new ReturnType(ResultType.FAILURE, "Can't promote, already promoting another lockref.");
+                }
+                seenLockToPromote = true;
+                if (!topOfQueue) {
+                    readBlockStart = ref;
+                    readBlockEnd = ref;
+                    break;
+                }
+            } else if (!seenLockToPromote && refToPromote<ref) {
+                return new ReturnType(ResultType.FAILURE, "Lockref does not exist.");
+            }
+            
+            if (lockType==LockType.READ || lockType==LockType.PROMOTING) {
+                if (!readBlock) {
+                    readBlockStart = ref;
+                    readBlock = true;
+                }
+                if (readBlock) {
+                    readBlockEnd = ref;
+                }
+                if (lockType==LockType.PROMOTING) {
+                    promotionOngoing = true;
+                }
+            }
+            
+            if (lockType==LockType.WRITE) {
+                if (refToPromote==ref) {
+                    return new ReturnType(ResultType.FAILURE, "Lockref is already write.");
+                }
+                if (readBlock) {
+                    readBlock = false;
+                    promotionOngoing = false;
+                    if (seenLockToPromote) {
+                        break;
+                    }
+                    //can no longer be lock holder after this
+                    topOfQueue = false;
+                }
+            }
+        }
+
+        if (readBlockStart<=refToPromote && refToPromote<=readBlockEnd) {
+            if (readBlockStart==refToPromote && refToPromote==readBlockEnd) {
+                promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.WRITE);
+                return new ReturnType(ResultType.SUCCESS, "Lock has successfully been upgraded.");
+            }
+            promoteLockTo(keyspace, lockqtable, key, lockRef, LockType.PROMOTING);
+            return new ReturnType(ResultType.FAILURE, "Your lock upgrade is in progress. Check again to see if successful."); 
+        }
+        
+        //shouldn't reach here?
+        return new ReturnType(ResultType.FAILURE,"Promotion failed.");
+    }
+
+    private void promoteLockTo(String keyspace, String table, String key, String lockRef, LockType newLockType)
+            throws MusicServiceException, MusicQueryException {
+        PreparedQueryObject queryObject =
+                new PreparedQueryObject("UPDATE " + keyspace + "." + table + " SET lockType=? WHERE key='" + key
+                        + "' AND lockReference = " + lockRef + " IF EXISTS;", newLockType);
+
+        //cannot use executePut because we need to ignore music timestamp adjustments for lock store
+        dsHandle.executePut(queryObject, MusicUtil.QUORUM);
+    }
+    
+
 
 }
 
 }
index 139d5f3..432b1c5 100644 (file)
@@ -25,6 +25,6 @@ package org.onap.music.lockingservice.cassandra;
 
 
 public enum LockType {
 
 
 public enum LockType {
-       READ, WRITE;
+       WRITE, READ, PROMOTING;
 }
 
 }
 
index e889e18..658f212 100644 (file)
@@ -121,6 +121,10 @@ public class MusicCore {
     public static List<String> getCurrentLockHolders(String fullyQualifiedKey) {
         return musicCore.getCurrentLockHolders(fullyQualifiedKey);
     }
     public static List<String> getCurrentLockHolders(String fullyQualifiedKey) {
         return musicCore.getCurrentLockHolders(fullyQualifiedKey);
     }
+    
+    public static ReturnType promoteLock(String lockIdToPromote) throws MusicLockingException {
+        return musicCore.promoteLock(lockIdToPromote);
+    }
 
     public static void destroyLockRef(String lockId) throws MusicLockingException {
         musicCore.destroyLockRef(lockId);
 
     public static void destroyLockRef(String lockId) throws MusicLockingException {
         musicCore.destroyLockRef(lockId);
index b839c7f..321e256 100644 (file)
@@ -568,4 +568,65 @@ public class RestMusicLocksAPI {
         }
     }
 
         }
     }
 
+    
+    /**
+     * Puts the requesting process in the q for this lock. The corresponding
+     * node will be created if it did not already exist
+     * 
+     * @param lockName
+     * @return
+     * @throws Exception 
+     */
+    @POST
+    @Path("/promote/{lockname}")
+    @ApiOperation(value = "Attempt to promote the lock for a single row.",
+        response = Map.class)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiResponses(value={
+        @ApiResponse(code=200, message = "Success",examples = @Example( value =  {
+            @ExampleProperty(mediaType="application/json",value = 
+                "\"status\" : \"SUCCESS\"}")
+        })),
+        @ApiResponse(code=400, message = "Failure",examples = @Example( value =  {
+            @ExampleProperty(mediaType="application/json",value = 
+                "{\"error\" : \"Unable to promote lock\","
+                + "\"status\" : \"FAILURE\"}") 
+        }))
+    })  
+    public Response promoteLock(
+            @ApiParam(value="Lock Id",required=true) @PathParam("lockId") String lockId,
+            @ApiParam(value = "Minor Version",required = false) @HeaderParam(XMINORVERSION) String minorVersion,
+            @ApiParam(value = "Patch Version",required = false) @HeaderParam(XPATCHVERSION) String patchVersion,
+            @ApiParam(value = "Authorization", required = true) @HeaderParam(MusicUtil.AUTHORIZATION) String authorization)
+                    throws Exception {
+        try { 
+            ResponseBuilder response = MusicUtil.buildVersionResponse(VERSION, minorVersion, patchVersion);
+            Map<String, Object> resultMap = MusicCore.validateLock(lockId);
+            if (resultMap.containsKey("Error")) {
+                logger.error(EELFLoggerDelegate.errorLogger,"", AppMessages.INCORRECTDATA  ,ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR);
+                response.status(Status.BAD_REQUEST);
+                return response.entity(new JsonResponse(ResultType.FAILURE).setError(String.valueOf(resultMap.get("Error"))).toMap()).build();
+            }
+            
+            String keyspaceName = (String) resultMap.get("keyspace");
+            EELFLoggerDelegate.mdcPut("keyspace", "( " + keyspaceName + " ) ");
+            
+            try {
+                ReturnType lockStatus = MusicCore.promoteLock(lockId);
+                if ( lockStatus.getResult().equals(ResultType.SUCCESS)) {
+                    response.status(Status.OK);
+                } else {
+                    response.status(Status.BAD_REQUEST);
+                }
+                return response.entity(new JsonResponse(lockStatus.getResult()).setLock(lockId).setMessage(lockStatus.getMessage()).toMap()).build();
+            } catch (Exception e) {
+                logger.error(EELFLoggerDelegate.errorLogger,AppMessages.INVALIDLOCK + lockId, ErrorSeverity.CRITICAL,
+                    ErrorTypes.LOCKINGERROR, e);
+                return response.status(Status.BAD_REQUEST).entity(new JsonResponse(ResultType.FAILURE).setError("Unable to promote lock").toMap()).build();
+            }
+        } finally {
+            EELFLoggerDelegate.mdcRemove("keyspace");
+        }
+    }
 }
 }
index 1ecb2ee..b322690 100644 (file)
@@ -133,15 +133,10 @@ public interface MusicCoreService {
 
     public void destroyLockRef(String lockId) throws MusicLockingException;
     
 
     public void destroyLockRef(String lockId) throws MusicLockingException;
     
-    //public MusicLockState destroyLockRef(String fullyQualifiedKey, String lockReference); // lock name, lock id
-
-    //public MusicLockState voluntaryReleaseLock(String fullyQualifiedKey, String lockReference)
-    //        throws MusicLockingException;// lock name,lock id
-
     public void deleteLock(String lockName) throws MusicLockingException;
     
     public void deleteLock(String lockName) throws MusicLockingException;
     
-    //public MusicLockState  forciblyReleaseLock(String fullyQualifiedKey, String lockReference) throws MusicLockingException, MusicServiceException, MusicQueryException;
-
+    public ReturnType promoteLock(String lockIdToPromote) throws MusicLockingException;
+    
     public List<String> getLockQueue(String fullyQualifiedKey)
         throws MusicServiceException, MusicQueryException, MusicLockingException;
     
     public List<String> getLockQueue(String fullyQualifiedKey)
         throws MusicServiceException, MusicQueryException, MusicLockingException;
     
index 0c30cc7..0d2e3f0 100644 (file)
@@ -158,6 +158,26 @@ public class MusicCassaCore implements MusicCoreService {
         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
         return lockReference;
     }
         logger.info(EELFLoggerDelegate.applicationLogger,"Time taken to create lock reference:" + (end - start) + " ms");
         return lockReference;
     }
+    
+    public ReturnType promoteLock(String lockId) throws MusicLockingException {
+        String[] splitString = lockId.split("\\.");
+        String keyspace = splitString[0].substring(1);//remove '$'
+        String table = splitString[1];
+        String primaryKeyValue = splitString[2].substring(0, splitString[2].lastIndexOf("$"));
+        String localFullyQualifiedKey = lockId.substring(1, lockId.lastIndexOf("$"));
+        String lockRef = lockId.substring(lockId.lastIndexOf("$")+1); //lockRef is "$" to end
+        
+        logger.info(EELFLoggerDelegate.applicationLogger,"Attempting to promote lock " + lockId);
+
+        try {
+            return getLockingServiceHandle().promoteLock(keyspace, table, primaryKeyValue, lockRef);
+        } catch (MusicServiceException e) {
+            throw new MusicLockingException("Unable to promote lock. ", e);
+        } catch (MusicQueryException e) {
+            throw new MusicLockingException("Unable to promote lock. ", e);
+        }
+        
+    }
 
 
     public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod)
 
 
     public ReturnType acquireLockWithLease(String fullyQualifiedKey, String lockReference, long leasePeriod)
index 3207214..7b116bc 100644 (file)
@@ -42,9 +42,10 @@ import java.util.UUID;
 import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
 import org.onap.music.datastore.MusicDataStore;
 import org.onap.music.datastore.PreparedQueryObject;
 import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
 import org.onap.music.datastore.MusicDataStore;
 import org.onap.music.datastore.PreparedQueryObject;
-
+import org.onap.music.lockingservice.cassandra.LockType;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
 
 public class CassandraCQL {
        public static final String createAdminKeyspace = "CREATE KEYSPACE admin WITH REPLICATION = "
 
 public class CassandraCQL {
        public static final String createAdminKeyspace = "CREATE KEYSPACE admin WITH REPLICATION = "
@@ -235,6 +236,9 @@ public class CassandraCQL {
         EmbeddedCassandraServerHelper.startEmbeddedCassandra();
         Cluster cluster = new Cluster.Builder().withoutJMXReporting().withoutMetrics().addContactPoint(address).withPort(9142).build();
         cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(5000);
         EmbeddedCassandraServerHelper.startEmbeddedCassandra();
         Cluster cluster = new Cluster.Builder().withoutJMXReporting().withoutMetrics().addContactPoint(address).withPort(9142).build();
         cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(5000);
+        EnumNameCodec<LockType> lockTypeCodec = new EnumNameCodec<LockType>(LockType.class);
+        cluster.getConfiguration().getCodecRegistry().register(lockTypeCodec);
+        
         Session session = cluster.connect();
         
         return new MusicDataStore(cluster, session);
         Session session = cluster.connect();
         
         return new MusicDataStore(cluster, session);
index 5669580..e9321d2 100644 (file)
@@ -480,6 +480,136 @@ public class TstRestMusicLockAPI {
         assertTrue( ((String)respMapCreate4.get("error")).toLowerCase().indexOf("deadlock") > -1 );
     }
 
         assertTrue( ((String)respMapCreate4.get("error")).toLowerCase().indexOf("deadlock") > -1 );
     }
 
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    public void test_lockPromotion() throws Exception {
+        System.out.println("Testing lock promotion");
+        createAndInsertIntoTable();
+        insertAnotherIntoTable();
+
+        // creates a lock 1
+        JsonLock jsonLock = createJsonLock(LockType.READ);
+        Response responseCreate1 = lock.createLockReference(lockName, "1", "1", authorization,
+                "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLock, "process1", appName);
+        Map<String, Object> respMapCreate1 = (Map<String, Object>) responseCreate1.getEntity();
+        String lockRefCreate1 = ((Map<String, String>) respMapCreate1.get("lock")).get("lock");
+
+        Response respMapPromote = lock.promoteLock(lockRefCreate1, "1", "1", authorization);
+        System.out.println("Status: " + respMapPromote.getStatus() + ". Entity " + respMapPromote.getEntity());
+        
+        assertEquals(200, respMapPromote.getStatus());
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    public void test_lockPromotionReadWrite() throws Exception {
+        System.out.println("Testing lock promotion with read and writes");
+        createAndInsertIntoTable();
+        insertAnotherIntoTable();
+
+        // creates a lock 1
+        JsonLock jsonLockRead = createJsonLock(LockType.READ);
+        Response responseCreate1 = lock.createLockReference(lockName, "1", "1", authorization,
+                "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLockRead, "process1", appName);
+        Map<String, Object> respMapCreate1 = (Map<String, Object>) responseCreate1.getEntity();
+        String lockRefCreate1 = ((Map<String, String>) respMapCreate1.get("lock")).get("lock");
+        
+        JsonLock jsonLockWrite = createJsonLock(LockType.WRITE);
+        Response responseCreate2 = lock.createLockReference(lockName, "1", "1", authorization,
+                "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLockWrite, "process1", appName);
+        Map<String, Object> respMapCreate2 = (Map<String, Object>) responseCreate2.getEntity();
+        String lockRefCreate2 = ((Map<String, String>) respMapCreate2.get("lock")).get("lock");
+
+        Response respMapPromote = lock.promoteLock(lockRefCreate1, "1", "1", authorization);
+        System.out.println("Status: " + respMapPromote.getStatus() + ". Entity " + respMapPromote.getEntity());
+        
+        assertEquals(200, respMapPromote.getStatus());
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    public void test_lockPromotionWriteRead() throws Exception {
+        System.out.println("Testing lock promotion with reads not at top of queue");
+        createAndInsertIntoTable();
+        insertAnotherIntoTable();
+
+        // creates a lock 1
+        JsonLock jsonLockWrite = createJsonLock(LockType.WRITE);
+        Response responseCreate2 = lock.createLockReference(lockName, "1", "1", authorization,
+                "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLockWrite, "process1", appName);
+        Map<String, Object> respMapCreate2 = (Map<String, Object>) responseCreate2.getEntity();
+        String lockRefCreate2 = ((Map<String, String>) respMapCreate2.get("lock")).get("lock");
+        
+        // creates a lock 2
+        JsonLock jsonLockRead = createJsonLock(LockType.READ);
+        Response responseCreate1 = lock.createLockReference(lockName, "1", "1", authorization,
+                "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLockRead, "process1", appName);
+        Map<String, Object> respMapCreate1 = (Map<String, Object>) responseCreate1.getEntity();
+        String lockRefCreate1 = ((Map<String, String>) respMapCreate1.get("lock")).get("lock");
+
+        Response respMapPromote = lock.promoteLock(lockRefCreate1, "1", "1", authorization);
+        System.out.println("Status: " + respMapPromote.getStatus() + ". Entity " + respMapPromote.getEntity());
+
+        assertEquals(200, respMapPromote.getStatus());
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    public void test_lockPromotion2Reads() throws Exception {
+        System.out.println("Testing lock promotion w/ 2 ReadLocks");
+        createAndInsertIntoTable();
+        insertAnotherIntoTable();
+
+        // creates a lock 1
+        JsonLock jsonLockRead = createJsonLock(LockType.READ);
+        Response responseCreate1 = lock.createLockReference(lockName, "1", "1", authorization,
+                "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLockRead, "process1", appName);
+        Map<String, Object> respMapCreate1 = (Map<String, Object>) responseCreate1.getEntity();
+        String lockRefCreate1 = ((Map<String, String>) respMapCreate1.get("lock")).get("lock");
+        
+        Response responseCreate2 = lock.createLockReference(lockName, "1", "1", authorization,
+                "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLockRead, "process1", appName);
+        Map<String, Object> respMapCreate2 = (Map<String, Object>) responseCreate1.getEntity();
+        String lockRefCreate2 = ((Map<String, String>) respMapCreate1.get("lock")).get("lock");
+
+        Response respMapPromote = lock.promoteLock(lockRefCreate1, "1", "1", authorization);
+        System.out.println("Status: " + respMapPromote.getStatus() + ". Entity " + respMapPromote.getEntity());
+        
+        assertEquals(400, respMapPromote.getStatus());
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Test
+    public void test_2lockPromotions() throws Exception {
+        System.out.println("Testing 2 lock promotions");
+        createAndInsertIntoTable();
+        insertAnotherIntoTable();
+
+        // creates a lock 1
+        JsonLock jsonLockRead = createJsonLock(LockType.READ);
+        Response responseCreate1 = lock.createLockReference(lockName, "1", "1", authorization,
+                "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLockRead, "process1", appName);
+        Map<String, Object> respMapCreate1 = (Map<String, Object>) responseCreate1.getEntity();
+        String lockRefCreate1 = ((Map<String, String>) respMapCreate1.get("lock")).get("lock");
+        
+        Response responseCreate2 = lock.createLockReference(lockName, "1", "1", authorization,
+                "abcde001-d857-4e90-b1e5-df98a3d40ce6", jsonLockRead, "process1", appName);
+        Map<String, Object> respMapCreate2 = (Map<String, Object>) responseCreate2.getEntity();
+        String lockRefCreate2 = ((Map<String, String>) respMapCreate2.get("lock")).get("lock");
+
+        Response respMapPromote = lock.promoteLock(lockRefCreate1, "1", "1", authorization);
+        System.out.println("Status: " + respMapPromote.getStatus() + ". Entity " + respMapPromote.getEntity());
+        
+        assertEquals(400, respMapPromote.getStatus());
+        
+        Response respMap2Promote = lock.promoteLock(lockRefCreate2, "1", "1", authorization);
+        System.out.println("Status: " + respMap2Promote.getStatus() + ". Entity " + respMap2Promote.getEntity());
+        
+        assertEquals(400, respMapPromote.getStatus());
+    }
+    
+    
 
     // Ignoring since this is now a duplicate of delete lock ref.
     @Test
 
     // Ignoring since this is now a duplicate of delete lock ref.
     @Test