Bug fixes, syncronization, and clean up daemon 60/100560/1
authorTschaen, Brendan <ctschaen@att.com>
Tue, 21 Jan 2020 18:39:12 +0000 (13:39 -0500)
committerTschaen, Brendan <ctschaen@att.com>
Tue, 21 Jan 2020 18:39:12 +0000 (13:39 -0500)
Change-Id: Ic70bac496621540bd1589b7f8d07d8db43f3aed0
Issue-ID: MUSIC-548
Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
music-core/src/main/java/org/onap/music/datastore/MusicDataStore.java
music-core/src/main/java/org/onap/music/lockingservice/cassandra/CassaLockStore.java
music-core/src/main/java/org/onap/music/lockingservice/cassandra/LockCleanUpDaemon.java [new file with mode: 0644]
music-core/src/main/java/org/onap/music/main/MusicUtil.java
music-core/src/main/java/org/onap/music/service/MusicCoreService.java
music-core/src/main/java/org/onap/music/service/impl/MusicCassaCore.java
music-rest/src/main/java/org/onap/music/MusicApplication.java
music-rest/src/main/java/org/onap/music/conductor/conditionals/MusicConditional.java
music-rest/src/main/java/org/onap/music/main/PropertiesLoader.java
music-rest/src/test/java/org/onap/music/unittests/TstRestMusicDataAPI.java

index 7f6c42c..cb22c0f 100755 (executable)
@@ -374,22 +374,25 @@ public class MusicDataStore {
 
             ResultSet rs = session.execute(preparedInsert);
             result = rs.wasApplied();
-
-        }
-        catch (AlreadyExistsException ae) {
-            // logger.error(EELFLoggerDelegate.errorLogger,"AlreadExistsException: " + ae.getMessage(),AppMessages.QUERYERROR,
-            // ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
-            throw new MusicQueryException("AlreadyExistsException: " + ae.getMessage(),ae);
-        } catch ( InvalidQueryException e ) {
-            // logger.error(EELFLoggerDelegate.errorLogger,"InvalidQueryException: " + e.getMessage(),AppMessages.SESSIONFAILED + " [" 
-            // + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR);
-            throw new MusicQueryException("InvalidQueryException: " + e.getMessage(),e);
+        } catch (AlreadyExistsException ae) {
+            throw new MusicServiceException("Already Exists Exception: " + ae.getMessage());
+        } catch (InvalidQueryException e) {
+            if (e.getMessage().contains("unconfigured table")) {
+                throw new MusicServiceException("Invalid Query Exception: " + e.getMessage());
+            } else {
+                logger.info(EELFLoggerDelegate.applicationLogger, "Query Exception: " + e.getMessage(),
+                        AppMessages.SESSIONFAILED + " [" + queryObject.getQuery() + "]", ErrorSeverity.INFO,
+                        ErrorTypes.QUERYERROR, e);
+                throw new MusicServiceException("Query Exception: " + e.getMessage());
+            }
         } catch (Exception e) {
-            // logger.error(EELFLoggerDelegate.errorLogger,e.getClass().toString() + ":" + e.getMessage(),AppMessages.SESSIONFAILED + " [" 
-            //     + queryObject.getQuery() + "]", ErrorSeverity.ERROR, ErrorTypes.QUERYERROR, e);
-            throw new MusicServiceException("Executing Session Failure for Request = " + "["
-                    + queryObject.getQuery() + "]" + " Reason = " + e.getMessage(),e);
+            logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),
+                    AppMessages.SESSIONFAILED + " [" + queryObject.getQuery() + "]", ErrorSeverity.ERROR,
+                    ErrorTypes.QUERYERROR, e);
+            throw new MusicServiceException("Executing Session Failure for Request = " + "[" + queryObject.getQuery()
+                    + "]" + " Reason = " + e.getMessage());
         }
+
         return result;
     }
 
index e953334..a727357 100644 (file)
@@ -50,7 +50,7 @@ import com.datastax.driver.extras.codecs.enums.EnumNameCodec;
 public class CassaLockStore {
     
     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassaLockStore.class);
-    private static String table_prepend_name = "lockQ_";
+    public static final String table_prepend_name = "lockQ_";
     private MusicDataStore dsHandle;
     
     public CassaLockStore() {
diff --git a/music-core/src/main/java/org/onap/music/lockingservice/cassandra/LockCleanUpDaemon.java b/music-core/src/main/java/org/onap/music/lockingservice/cassandra/LockCleanUpDaemon.java
new file mode 100644 (file)
index 0000000..492a48f
--- /dev/null
@@ -0,0 +1,132 @@
+/*
+ * ============LICENSE_START==========================================
+ * org.onap.music
+ * ===================================================================
+ *  Copyright (c) 2019 AT&T Intellectual Property
+ * ===================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * 
+ * ============LICENSE_END=============================================
+ * ====================================================================
+ */
+
+package org.onap.music.lockingservice.cassandra;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.onap.music.datastore.MusicDataStoreHandle;
+import org.onap.music.datastore.PreparedQueryObject;
+import org.onap.music.eelf.logging.EELFLoggerDelegate;
+import org.onap.music.exceptions.MusicQueryException;
+import org.onap.music.exceptions.MusicServiceException;
+import org.onap.music.main.MusicCore;
+import org.onap.music.main.MusicUtil;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+
+public class LockCleanUpDaemon extends Thread {
+    
+    boolean terminated = false;
+    private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(LockCleanUpDaemon.class);
+
+    
+    public LockCleanUpDaemon() {
+    }
+    
+    @Override
+    public void run() {
+        if (MusicUtil.getLockDaemonSleepTimeMs()<0) {
+            terminate();
+        }
+        while (!terminated) {
+            try {
+                cleanupStaleLocks();
+            } catch (MusicServiceException e) {
+                logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to clean up locks", e);
+            }
+            try {
+                Thread.sleep(MusicUtil.getLockDaemonSleepTimeMs());
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void cleanupStaleLocks() throws MusicServiceException {
+        Set<String> lockQTables = getLockQTables();
+        logger.info(EELFLoggerDelegate.applicationLogger, "Lock q tables found: " + lockQTables);
+        for(String lockTable: lockQTables) {
+            try {
+                cleanUpLocksFromTable(lockTable);
+            } catch (MusicServiceException e) {
+                logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to clear locks on table " + lockTable, e);
+            }
+        }
+    }
+
+
+    private Set<String> getLockQTables() throws MusicServiceException {
+        Set<String> keyspacesToCleanUp = MusicUtil.getKeyspacesToCleanLocks();
+        Set<String> lockQTables = new HashSet<>();
+        
+        PreparedQueryObject query = new PreparedQueryObject();
+        query.appendQueryString("SELECT keyspace_name, table_name FROM system_schema.tables;");
+        ResultSet results = MusicCore.get(query);
+        
+        for (Row row: results) {
+            if (keyspacesToCleanUp.contains(row.getString("keyspace_name"))
+                    && row.getString("table_name").toLowerCase().startsWith(CassaLockStore.table_prepend_name.toLowerCase()) ) {
+                lockQTables.add(row.getString("keyspace_name") + "." + row.getString("table_name"));
+            }
+        }
+        return lockQTables;
+    }
+
+    private void cleanUpLocksFromTable(String lockTable) throws MusicServiceException {
+        PreparedQueryObject query = new PreparedQueryObject();
+        query.appendQueryString("SELECT * from " + lockTable);
+        ResultSet results = MusicCore.get(query);
+        for (Row lock: results) {
+            if (!lock.isNull("lockreference")) {
+                try {
+                    deleteLockIfStale(lockTable, lock);
+                } catch (MusicServiceException e) {
+                    logger.warn(EELFLoggerDelegate.applicationLogger, "Unable to delete a potentially stale lock " + lock, e);
+                }
+            }
+        }
+    }
+    
+    
+    private void deleteLockIfStale(String lockTable, Row lock) throws MusicServiceException {
+        if (lock.isNull("createtime") && lock.isNull("acquiretime")) {
+            return;
+        }
+
+        long createTime = lock.isNull("createtime") ? 0 : Long.parseLong(lock.getString("createtime"));
+        long acquireTime = lock.isNull("acquiretime") ? 0 : Long.parseLong(lock.getString("acquiretime"));
+        long row_access_time = Math.max(createTime, acquireTime);
+        if (System.currentTimeMillis() > row_access_time + MusicUtil.getDefaultLockLeasePeriod()) {
+            logger.info(EELFLoggerDelegate.applicationLogger, "Stale lock detected and being removed: " + lock);
+            PreparedQueryObject query = new PreparedQueryObject();
+            query.appendQueryString("DELETE FROM " + lockTable + " WHERE key='" + lock.getString("key") + "' AND " +
+                    "lockreference=" + lock.getLong("lockreference") + " IF EXISTS;");
+            MusicDataStoreHandle.getDSHandle().getSession().execute(query.getQuery());
+        }
+    }
+
+    public void terminate() {
+        terminated = true;
+    }
+}
index 78d17c6..865ca01 100644 (file)
@@ -34,9 +34,11 @@ import java.io.FileNotFoundException;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Scanner;
+import java.util.Set;
 import java.util.UUID;
 
 import javax.ws.rs.core.Response;
@@ -107,6 +109,8 @@ public class MusicUtil {
     private static boolean debug = true;
     private static String version = "0.0.0";
     private static String build = "";
+    private static long lockDaemonSleepms = 1000;
+    private static Set<String> keyspacesToCleanLocks = new HashSet<>();
 
     private static String musicPropertiesFilePath = PROPERTIES_FILE;
     // private static final String[] propKeys = new String[] { MusicUtil.class.getDeclaredMethod(arg0, )"build","cassandra.host", "debug",
@@ -160,7 +164,26 @@ public class MusicUtil {
     private static Boolean clientIdRequired = false;
     private static Boolean messageIdRequired = false;
     private static String cipherEncKey = "";
+    
+    private static long createLockWaitPeriod = 300;
+    private static int createLockWaitIncrement = 50;
+    
+    public static long getCreateLockWaitPeriod() {
+        return createLockWaitPeriod;
+    }
+    
+    public static void setCreateLockWaitPeriod(long createLockWaitPeriod) {
+        MusicUtil.createLockWaitPeriod = createLockWaitPeriod;
+    }
 
+    public static int getCreateLockWaitIncrement() {
+        return createLockWaitIncrement;
+    }
+    
+    public static void setCreateLockWaitIncrement(int createLockWaitIncrement) {
+        MusicUtil.createLockWaitIncrement = createLockWaitIncrement;
+    }
+    
     public MusicUtil() {
         throw new IllegalStateException("Utility Class");
     }
@@ -810,6 +833,27 @@ public class MusicUtil {
         MusicUtil.messageIdRequired = messageIdRequired;
     }
 
+    /**
+    *  @return the sleep time, in milliseconds, for the lock cleanup daemon
+    */
+    public static long getLockDaemonSleepTimeMs() {
+        return lockDaemonSleepms;
+    }
+    
+    /**
+    * set the sleep time, in milliseconds, for the lock cleanup daemon
+    */
+    public static void setLockDaemonSleepTimeMs(long timeoutms) {
+        MusicUtil.lockDaemonSleepms = timeoutms;
+    }
+
+    public static Set<String> getKeyspacesToCleanLocks() {
+        return keyspacesToCleanLocks;
+    }
+
+    public static void setKeyspacesToCleanLocks(Set<String> keyspaces) {
+        MusicUtil.keyspacesToCleanLocks = keyspaces;
+    }
 
     public static String getCipherEncKey() {
         return MusicUtil.cipherEncKey;
index 2fc8814..753d9b2 100644 (file)
@@ -97,7 +97,7 @@ public interface MusicCoreService {
      * @param owner the owner of the lock, for deadlock prevention
      */
     public String createLockReference(String fullyQualifiedKey, String owner) throws MusicLockingException;
-    
+
     /**
      * Create a lock ref in the music lock store
      * @param fullyQualifiedKey the key to create a lock on
index 63f2d14..e1416f8 100644 (file)
@@ -27,12 +27,13 @@ package org.onap.music.service.impl;
 
 import java.io.StringWriter;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.StringTokenizer;
-
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.ws.rs.core.MultivaluedMap;
 
 import org.onap.music.datastore.Condition;
@@ -75,7 +76,9 @@ public class MusicCassaCore implements MusicCoreService {
     private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicCassaCore.class);
     private static MusicCassaCore musicCassaCoreInstance = null;
     private static Set<String> set = Collections.synchronizedSet(new HashSet<String>());
-
+    HashMap<String, Integer> map = new HashMap<>();
+    AtomicInteger wait = new AtomicInteger(0);
+    
     private MusicCassaCore() {
         // not going to happen
     }
@@ -119,10 +122,11 @@ public class MusicCassaCore implements MusicCoreService {
     public String createLockReference(String fullyQualifiedKey, String owner) throws MusicLockingException {
         return createLockReference(fullyQualifiedKey, LockType.WRITE, owner);
     }
-
+    
+    
     /**
      * This will be called for Atomic calls
-     * 
+     * it ensures that only one thread tries to create a lock on each key at a time
      */
     public String createLockReferenceAtomic(String fullyQualifiedKey, LockType locktype) throws MusicLockingException {
         String[] splitString = fullyQualifiedKey.split("\\.");
index 22c9e7b..0fe354d 100755 (executable)
@@ -33,6 +33,7 @@ import org.onap.music.authentication.CadiAuthFilter;
 import org.onap.music.authentication.MusicAuthorizationFilter;
 import org.onap.music.eelf.logging.EELFLoggerDelegate;
 import org.onap.music.eelf.logging.MusicLoggingServletFilter;
+import org.onap.music.lockingservice.cassandra.LockCleanUpDaemon;
 import org.onap.music.main.MusicUtil;
 import org.onap.music.main.PropertiesLoader;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -66,6 +67,10 @@ public class MusicApplication extends SpringBootServletInitializer {
 
     public static void main(String[] args) {
         new MusicApplication().configure(new SpringApplicationBuilder(MusicApplication.class)).run(args);
+        
+        LockCleanUpDaemon daemon = new LockCleanUpDaemon();
+        daemon.setDaemon(true);
+        daemon.start();
     }
 
     @Override
index ebaa3a1..2c69c43 100644 (file)
@@ -142,11 +142,20 @@ public class MusicConditional {
                     return new ReturnType(ResultType.FAILURE, e.getMessage());
                 }
                 if (results.all().isEmpty()) {
-                    MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.INSERT), "critical");
-                    return new ReturnType(ResultType.SUCCESS, "insert");
+                    PreparedQueryObject qObject = queryBank.get(MusicUtil.INSERT);
+                    qObject.setOperation(MusicUtil.INSERT);
+                    logger.info(EELFLoggerDelegate.debugLogger,"### Conditional Insert");
+                    MusicCore.criticalPut(keyspace, tableName, primaryKey, qObject, lockId, null);
+                    //MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.INSERT), "critical");
+                    return new ReturnType(ResultType.SUCCESS, MusicUtil.INSERT);
+
                 } else {
-                    MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.UPDATE), "critical");
-                    return new ReturnType(ResultType.SUCCESS, "update");
+                    PreparedQueryObject qObject = queryBank.get(MusicUtil.UPDATE);
+                    qObject.setOperation(MusicUtil.UPDATE);
+                    logger.info(EELFLoggerDelegate.debugLogger,"### Condition Update");
+                    MusicCore.criticalPut(keyspace, tableName, primaryKey, qObject, lockId, null);
+                    //MusicDataStoreHandle.getDSHandle().executePut(queryBank.get(MusicUtil.UPDATE), "critical");
+                    return new ReturnType(ResultType.SUCCESS, MusicUtil.UPDATE);
                 }
             } else {
                 return new ReturnType(ResultType.FAILURE,
@@ -214,13 +223,15 @@ public class MusicConditional {
                     JSONObject json = new JSONObject(updatedValues);
                     PreparedQueryObject update = new PreparedQueryObject();
                     String vector_ts = String.valueOf(Thread.currentThread().getId() + System.currentTimeMillis());
-                    update.appendQueryString("UPDATE " + dataObj.getKeyspace() + "." + dataObj.getTableName() + " SET " + dataObj.getCascadeColumnName() + "['" + dataObj.getPlanId()
+                    update.appendQueryString("UPDATE " + dataObj.getKeyspace() + "." + dataObj.getTableName() + " SET "
+                            + dataObj.getCascadeColumnName() + "['" + dataObj.getPlanId()
                             + "'] = ?, vector_ts = ? WHERE " + dataObj.getPrimaryKey() + " = ?");
                     update.addValue(MusicUtil.convertToActualDataType(DataType.text(), json.toString()));
                     update.addValue(MusicUtil.convertToActualDataType(DataType.text(), vector_ts));
                     update.addValue(MusicUtil.convertToActualDataType(DataType.text(), dataObj.getPrimaryKeyValue()));
                     try {
-                        MusicDataStoreHandle.getDSHandle().executePut(update, "critical");
+                        update.setOperation(MusicUtil.UPDATE);
+                        MusicCore.criticalPut(dataObj.keyspace, dataObj.tableName, dataObj.primaryKeyValue, update, dataObj.lockId, null);
                     } catch (Exception ex) {
                         logger.error(EELFLoggerDelegate.applicationLogger, ex);
                         return new ReturnType(ResultType.FAILURE, ex.getMessage());
@@ -228,9 +239,10 @@ public class MusicConditional {
                 }else {
                     return new ReturnType(ResultType.FAILURE,"Cannot find data related to key: "+dataObj.getPrimaryKey());
                 }
-                MusicDataStoreHandle.getDSHandle().executePut(dataObj.getQueryBank().get(MusicUtil.UPSERT), "critical");
+                PreparedQueryObject qObject = dataObj.getQueryBank().get(MusicUtil.UPSERT);
+                qObject.setOperation(MusicUtil.INSERT);
+                MusicCore.criticalPut(dataObj.keyspace, dataObj.tableName, dataObj.primaryKeyValue, qObject, dataObj.lockId, null);
                 return new ReturnType(ResultType.SUCCESS, "update success");
-
             } else {
                 return new ReturnType(ResultType.FAILURE,
                         "Cannot perform operation since you are the not the lock holder");
@@ -322,7 +334,7 @@ public class MusicConditional {
             counter = counter + 1;
         }
         queryObject.appendQueryString("INSERT INTO " + keySpaceName + "." + tableName + " "
-                + fieldsString + " VALUES " + valueString);
+                + fieldsString + " VALUES " + valueString + ";");
         return queryObject;
     }
     
index 6fbc76a..9c69b9e 100644 (file)
@@ -22,6 +22,7 @@
 
 package org.onap.music.main;
 
+import java.util.HashSet;
 import java.util.Properties;
 
 import org.onap.music.eelf.logging.EELFLoggerDelegate;
@@ -77,7 +78,19 @@ public class PropertiesLoader implements InitializingBean {
 
     @Value("${retry.count}")
     public String rertryCount;
+    
+    @Value("${lock.daemon.sleeptime.ms}")
+    public String lockDaemonSleeptimems;
 
+    @Value("${keyspaces.for.lock.cleanup}")
+    public String keyspacesForLockCleanup;
+
+    @Value("${create.lock.wait.period.ms}")
+    private long createLockWaitPeriod;
+
+    @Value("${create.lock.wait.increment.ms}")
+    private int createLockWaitIncrement;
+    
     @Value("${transId.header.prefix}")
     private String transIdPrefix;
 
@@ -172,6 +185,16 @@ public class PropertiesLoader implements InitializingBean {
         if (isKeyspaceActive != null && !isKeyspaceActive.equals("${keyspace.active}")) {
             MusicUtil.setKeyspaceActive(Boolean.parseBoolean(isKeyspaceActive));
         }
+        if (lockDaemonSleeptimems != null && !lockDaemonSleeptimems.equals("${lock.daemon.sleeptime.ms}")) {
+            MusicUtil.setLockDaemonSleepTimeMs(Long.parseLong(lockDaemonSleeptimems));
+        }
+        if (keyspacesForLockCleanup !=null && !keyspacesForLockCleanup.equals("${keyspaces.for.lock.cleanup}")) {
+            HashSet<String> keyspaces = new HashSet<>();
+            for (String keyspace: keyspacesForLockCleanup.split(",")) {
+                keyspaces.add(keyspace);
+            }
+            MusicUtil.setKeyspacesToCleanLocks(keyspaces);
+        }
         if(transIdPrefix!=null) {
             MusicUtil.setTransIdPrefix(transIdPrefix);
         }
@@ -203,6 +226,14 @@ public class PropertiesLoader implements InitializingBean {
         if(messageIdRequired!=null) {
             MusicUtil.setMessageIdRequired(messageIdRequired);
         }
+        
+        if(createLockWaitPeriod!=0) {
+            MusicUtil.setCreateLockWaitPeriod(createLockWaitPeriod);
+        }
+
+        if(createLockWaitIncrement!=0) {
+            MusicUtil.setCreateLockWaitIncrement(createLockWaitIncrement);
+        }
     }
 
     public static void loadProperties(Properties properties) {
index 407d032..ea3fb54 100644 (file)
@@ -322,7 +322,7 @@ public class TstRestMusicDataAPI {
         assertEquals(400, response2.getStatus());
         Map<String, String> respMap = (Map<String, String>) response2.getEntity();
         assertEquals(ResultType.FAILURE, respMap.get("status"));
-        assertEquals("AlreadyExistsException: Table " + keyspaceName + "." + tableNameDup + " already exists", respMap.get("error"));
+        assertEquals("Already Exists Exception: Table " + keyspaceName + "." + tableNameDup + " already exists", respMap.get("error"));
     }