Implement postgres, fixes to eventual, and many bug fixes 62/86562/9
authorArthur Martella <arthur.martella.1@att.com>
Mon, 29 Apr 2019 19:03:07 +0000 (15:03 -0400)
committerArthur Martella <arthur.martella.1@att.com>
Fri, 3 May 2019 14:04:56 +0000 (10:04 -0400)
Streamline upper and lower cases for Ranges
Fix initialization of system

Merged version of commit https://gerrit.onap.org/r/#/c/86160/

Change-Id: I169ed56ff79ff0a2e14ab9bc5e0467d1c0b9f0a9
Issue-ID: MUSIC-387
Signed-off-by: Arthur Martella <arthur.martella.1@att.com>
34 files changed:
mdbc-server/pom.xml
mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
mdbc-server/src/main/java/org/onap/music/mdbc/MDBCUtils.java
mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
mdbc-server/src/main/java/org/onap/music/mdbc/TestUtils.java [moved from mdbc-server/src/test/java/org/onap/music/mdbc/TestUtils.java with 100% similarity]
mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java [new file with mode: 0644]
mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json [new file with mode: 0644]
mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java [new file with mode: 0644]
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java [new file with mode: 0755]
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Utils.java
mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
mdbc-server/src/main/java/org/onap/music/mdbc/tables/Operation.java
mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreatePartition.java
mdbc-server/src/main/resources/mdbc.properties
mdbc-server/src/test/java/org/onap/music/mdbc/MDBCUtilsTest.java
mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java [new file with mode: 0644]
mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java
mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java [new file with mode: 0644]
mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java

index cdf7cff..7a46120 100755 (executable)
@@ -89,9 +89,9 @@
             <version>20160810</version>
         </dependency>
         <dependency>
-          <groupId>org.mariadb.jdbc</groupId>
-          <artifactId>mariadb-java-client</artifactId>
-          <version>2.3.0</version>
+            <groupId>org.mariadb.jdbc</groupId>
+            <artifactId>mariadb-java-client</artifactId>
+            <version>2.3.0</version>
         </dependency>
         <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
         <dependency>
             <artifactId>protobuf-java</artifactId>
             <version>3.6.1</version>
         </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>42.2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>com.opentable.components</groupId>
+            <artifactId>otj-pg-embedded</artifactId>
+            <version>0.13.1</version>
+            <scope>test</scope>
+        </dependency>
+
+
     </dependencies>
 
     <build>
index ced5745..91b13f3 100755 (executable)
@@ -28,6 +28,8 @@ public class Configuration {
     public static final String KEY_DB_MIXIN_NAME    = "MDBC_DB_MIXIN";
     /** The property name to use to select the MUSIC 'mixin'. */
     public static final String KEY_MUSIC_MIXIN_NAME = "MDBC_MUSIC_MIXIN";
+    /** The property name to select if async staging table update is used */
+    public static final String KEY_ASYNC_STAGING_TABLE_UPDATE = "ASYNC_STAGING_TABLE_UPDATE";
     /** The name of the default mixin to use for the DBInterface. */
     public static final String DB_MIXIN_DEFAULT     = "mysql";//"h2";
     /** The name of the default mixin to use for the MusicInterface. */
@@ -44,4 +46,6 @@ public class Configuration {
     public static final long DEFAULT_OWNERSHIP_TIMEOUT = 5*60*60*1000;//default of 5 hours
     /** The property name to provide comma separated list of ranges to warmup */
     public static final String KEY_WARMUPRANGES = "warmupranges";
+       /** Default async staging table update o ption*/
+       public static final String ASYNC_STAGING_TABLE_UPDATE = "false";
 }
index 32d456e..ae2b869 100755 (executable)
@@ -98,10 +98,25 @@ public class MDBCUtils {
        }
 
        public static List<Range> getTables(Map<String,List<SQLOperation>> queryParsed){
+               return getTables(null, queryParsed);
+       }
+
+       public static List<Range> getTables(String defaultDatabaseName, Map<String,List<SQLOperation>> queryParsed){
            List<Range> ranges = new ArrayList<>();
            for(String table: queryParsed.keySet()){
-              ranges.add(new Range(table));
-        }
+               String[] parts = table.split("\\.");
+               if(parts.length==2){
+                               ranges.add(new Range(table));
+                       }
+                       else if(parts.length==1 && defaultDatabaseName!=null){
+                               ranges.add(new Range(defaultDatabaseName+"."+table));
+               }
+               else{
+                       throw new IllegalArgumentException("Table should either have only one '.' or none at all, the table "
+                                       + "received is "+table);
+                       }
+
+               }
            return ranges;
     }
 }
\ No newline at end of file
index 3db6c3f..f1ac851 100755 (executable)
@@ -160,18 +160,7 @@ public class MdbcConnection implements Connection {
             if(progressKeeper!=null) progressKeeper.commitRequested(id);
             logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b);
             if (b) {
-                // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction
-                if(id == null || id.isEmpty()) {
-                    logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
-                    throw new SQLException("tx id is null");
-                }
-                try {
-                    mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper);
-                } catch (MDBCServiceException e) {
-                    // TODO Auto-generated catch block
-                    logger.error("Cannot commit log to music" + e.getStackTrace());
-                    throw new SQLException(e.getMessage(), e);
-                }
+                musicCommit();
             }
             if(progressKeeper!=null) {
                 progressKeeper.setMusicDone(id);
@@ -191,13 +180,7 @@ public class MdbcConnection implements Connection {
         return jdbcConn.getAutoCommit();
     }
 
-    /**
-     * Perform a commit, as requested by the JDBC driver.  If any row updates have been delayed,
-     * they are performed now and copied into MUSIC.
-     * @throws SQLException
-     */
-    @Override
-    public void commit() throws SQLException {
+    private void musicCommit() throws SQLException {
         if(progressKeeper.isComplete(id)) {
             return;
         }
@@ -205,6 +188,7 @@ public class MdbcConnection implements Connection {
             progressKeeper.commitRequested(id);
         }
 
+        dbi.preCommitHook();
         try {
             logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
             // transaction was committed -- add all the updates into the REDO-Log in MUSIC
@@ -214,6 +198,16 @@ public class MdbcConnection implements Connection {
             logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
             throw new SQLException("Failure commiting to MUSIC", e);
         }
+    }
+
+    /**
+     * Perform a commit, as requested by the JDBC driver.  If any row updates have been delayed,
+     * they are performed now and copied into MUSIC.
+     * @throws SQLException
+     */
+    @Override
+    public void commit() throws SQLException {
+        musicCommit();
 
         if(progressKeeper != null) {
             progressKeeper.setMusicDone(id);
@@ -273,9 +267,10 @@ public class MdbcConnection implements Connection {
         } catch (MDBCServiceException e) {
             throw new SQLException("Failure during relinquish of partition",e);
         }
-       // Warning! Make sure this call remains AFTER the call to jdbcConn.close(),
-       // otherwise you're going to get stuck in an infinite loop.
-       statemanager.closeConnection(id);
+
+        // Warning! Make sure this call remains AFTER the call to jdbcConn.close(),
+        // otherwise you're going to get stuck in an infinite loop.
+        statemanager.closeConnection(id);
     }
 
     @Override
@@ -507,7 +502,8 @@ public class MdbcConnection implements Connection {
         //Parse tables from the sql query
         Map<String, List<SQLOperation>> tableToInstruction = QueryProcessor.parseSqlQuery(sql);
         //Check ownership of keys
-        List<Range> queryTables = MDBCUtils.getTables(tableToInstruction);
+        String defaultSchema = dbi.getSchema();
+        List<Range> queryTables = MDBCUtils.getTables(defaultSchema, tableToInstruction);
         if (this.partition!=null) {
             List<Range> snapshot = this.partition.getSnapshot();
             if(snapshot!=null){
@@ -550,18 +546,15 @@ public class MdbcConnection implements Connection {
         logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1);
         for (String tableName : set1) {
             // This map will be filled in if this table was previously discovered
-            tableName = tableName.toUpperCase();
-            if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) {
+            if (!table_set.contains(tableName.toUpperCase()) && !dbi.getReservedTblNames().contains(tableName.toUpperCase())) {
                 logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName);
                 try {
                     TableInfo ti = dbi.getTableInfo(tableName);
-                    mi.initializeMusicForTable(ti,tableName);
-                    //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted
-                    ti = dbi.getTableInfo(tableName);
-                    mi.createDirtyRowTable(ti,tableName);
+                    //mi.initializeMusicForTable(ti,tableName);
+                    //mi.createDirtyRowTable(ti,tableName);
                     dbi.createSQLTriggers(tableName);
-                    table_set.add(tableName);
-                    dbi.synchronizeData(tableName);
+                    table_set.add(tableName.toUpperCase());
+                    //dbi.synchronizeData(tableName);
                     logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" +
                         table_set.size() + "/" + set1.size() + "tables uploaded");
                 } catch (Exception e) {
index 42b9710..500ed81 100755 (executable)
@@ -80,6 +80,15 @@ public class MdbcServer {
        Properties connectionProps = new Properties();
        connectionProps.put("user", user);
        connectionProps.put("password", password);
+       String defaultMusicMixin = Utils.getDefaultMusicMixin();
+       if(defaultMusicMixin!=null){
+               connectionProps.put(Configuration.KEY_MUSIC_MIXIN_NAME,defaultMusicMixin);
+               }
+       String defaultDBMixin = Utils.getDefaultDBMixin();
+               if(defaultMusicMixin!=null){
+               connectionProps.put(Configuration.KEY_DB_MIXIN_NAME,defaultDBMixin);
+               }
+               Utils.registerDefaultDrivers();
        meta = new MdbcServerLogic(url,connectionProps,config);
        LocalService service = new LocalService(meta);
 
index 16e7170..41aed26 100755 (executable)
@@ -23,6 +23,9 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Objects;
 
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.mixins.MusicMixin;
+
 
 /**
  * This class represent a range of the whole database 
@@ -32,13 +35,21 @@ import java.util.Objects;
  */
 public class Range implements Cloneable{
 
+    private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Range.class);
+
        private String table;
 
        public Range(String table) {
-               this.table = table.toUpperCase();
+               final String[] split = table.split("\\.");
+               if(split.length!=2){
+                       logger.debug("Table should contain schema, received in constructor: " + table);
+//                     throw new IllegalArgumentException("Table should always contain the schema, table received in "
+//                             + "constructor is "+table);
+               }
+               this.table = table;
        }
 
-       public String toString(){return table.toUpperCase();}
+       public String toString(){return table;}
 
        /**
         * Compares to Range types
@@ -55,7 +66,7 @@ public class Range implements Cloneable{
 
        @Override
        public int hashCode(){
-        return table.hashCode();
+        return table.toUpperCase().hashCode();
        }
 
        @Override
@@ -74,11 +85,11 @@ public class Range implements Cloneable{
 
     public static boolean overlaps(List<Range> ranges, String table){
                //\TODO check if parallel stream makes sense here
-        return ranges.stream().map((Range r) -> r.table.equals(table)).anyMatch((Boolean b) -> b);
+        return ranges.stream().map((Range r) -> r.table.toUpperCase().equals(table.toUpperCase())).anyMatch((Boolean b) -> b);
        }
 
        public boolean overlaps(Range other) {
-               return table.equals(other.table);
+               return table.toUpperCase().equals(other.table.toUpperCase());
        }
 
     public String getTable() {
index e4c4a24..18bc0db 100644 (file)
@@ -21,7 +21,6 @@ package org.onap.music.mdbc;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.onap.music.exceptions.MDBCServiceException;
@@ -47,6 +46,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -102,10 +102,10 @@ public class StateManager {
     public StateManager() {
     }
     
-       public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
+       public StateManager(String sqlDBUrl, Properties newInfo, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
         this.sqlDBName = sqlDBName;
-        this.sqlDBUrl = sqlDBUrl;
-        this.info = info;
+        this.sqlDBUrl = cleanSqlUrl(sqlDBUrl);
+        this.info = new Properties();
         this.mdbcServerName = mdbcServerName;
     
         this.connectionRanges = new ConcurrentHashMap<>();
@@ -117,6 +117,7 @@ public class StateManager {
                } catch (IOException e) {
                        logger.error(EELFLoggerDelegate.errorLogger, e.getMessage());
                }
+               info.putAll(newInfo);
         cassandraUrl = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT);
         musicmixin = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT);
         
@@ -129,6 +130,16 @@ public class StateManager {
         ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout);
     }
 
+    protected String cleanSqlUrl(String url){
+           if(url!=null) {
+            url = url.trim();
+            if (url.length() > 0 && url.charAt(url.length() - 1) == '/') {
+                url= url.substring(0, url.length() - 1);
+            }
+        }
+        return url;
+    }
+
     protected void initTxDaemonThread(){
         txDaemon = new Thread(
             new MusicTxDigestDaemon(Integer.parseInt(
@@ -149,27 +160,21 @@ public class StateManager {
     }
     
     protected void initSqlDatabase() throws MDBCServiceException {
-        try {
-            //\TODO: pass the driver as a variable
-            Class.forName("org.mariadb.jdbc.Driver");
-        }
-        catch (ClassNotFoundException e) {
-            logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
-                    ErrorTypes.GENERALSERVICEERROR);
-            return;
-        }
-        try {
-            Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info);
-            StringBuilder sql = new StringBuilder("CREATE DATABASE IF NOT EXISTS ")
+        if(!this.sqlDBUrl.toLowerCase().startsWith("jdbc:postgresql")) {
+            try {
+                Connection sqlConnection = DriverManager.getConnection(this.sqlDBUrl, this.info);
+                StringBuilder sql = new StringBuilder("CREATE DATABASE IF NOT EXISTS ")
                     .append(sqlDBName)
                     .append(";");
-            Statement stmt = sqlConnection.createStatement();
-            stmt.execute(sql.toString());
-            sqlConnection.close();
-        } catch (SQLException e) {
-            logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
+                Statement stmt = sqlConnection.createStatement();
+                stmt.execute(sql.toString());
+                sqlConnection.close();
+            } catch (SQLException e) {
+                logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.UNKNOWNERROR,
+                    ErrorSeverity.CRITICAL,
                     ErrorTypes.GENERALSERVICEERROR);
-            throw new MDBCServiceException(e.getMessage(), e);
+                throw new MDBCServiceException(e.getMessage(), e);
+            }
         }
     }
     
@@ -306,17 +311,9 @@ public class StateManager {
        public Connection openConnection(String id) {
                Connection sqlConnection;
        MdbcConnection newConnection;
-        try {
-            //TODO: pass the driver as a variable
-            Class.forName("org.mariadb.jdbc.Driver");
-        }
-        catch (ClassNotFoundException e) {
-            // TODO Auto-generated catch block
-            logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.QUERYERROR, ErrorSeverity.CRITICAL,
-                    ErrorTypes.QUERYERROR);
-        }
-
+        Utils.registerDefaultDrivers();
         //Create connection to local SQL DB
+
                try {
                        sqlConnection = DriverManager.getConnection(this.sqlDBUrl+"/"+this.sqlDBName, this.info);
                } catch (SQLException e) {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java b/mdbc-server/src/main/java/org/onap/music/mdbc/Utils.java
new file mode 100644 (file)
index 0000000..7a09dca
--- /dev/null
@@ -0,0 +1,80 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import org.onap.music.logging.EELFLoggerDelegate;
+
+public class Utils {
+
+
+    public static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Utils.class);
+
+    static Properties retrieveMdbcProperties() {
+        Properties pr = null;
+        try {
+            pr = new Properties();
+            pr.load(Utils.class.getResourceAsStream("/mdbc.properties"));
+        } catch (IOException e) {
+            logger.error(EELFLoggerDelegate.errorLogger, "Could not load property file: " + e.getMessage());
+        }
+        return pr;
+    }
+
+    public static String getDefaultMusicMixin() {
+        Properties pr = retrieveMdbcProperties();
+        if (pr == null)
+            return null;
+        String defaultMusicMixin = pr.getProperty("DEFAULT_MUSIC_MIXIN");
+        return defaultMusicMixin;
+    }
+
+    public static String getDefaultDBMixin() {
+        Properties pr = retrieveMdbcProperties();
+        if (pr == null)
+            return null;
+        String defaultMusicMixin = pr.getProperty("DEFAULT_DB_MIXIN");
+        return defaultMusicMixin;
+    }
+
+    public static void registerDefaultDrivers() {
+        Properties pr = null;
+        try {
+            pr = new Properties();
+            pr.load(Utils.class.getResourceAsStream("/mdbc.properties"));
+        } catch (IOException e) {
+            logger.error("Could not load property file > " + e.getMessage());
+        }
+
+        String drivers = pr.getProperty("DEFAULT_DRIVERS");
+        for (String driver : drivers.split("[ ,]")) {
+            logger.info(EELFLoggerDelegate.applicationLogger, "Registering jdbc driver '" + driver + "'");
+            try {
+                Class.forName(driver.trim());
+            } catch (ClassNotFoundException e) {
+                logger.error(EELFLoggerDelegate.errorLogger, "Driver class " + driver + " not found.");
+            }
+        }
+    }
+}
index 38309d5..391ee1a 100755 (executable)
@@ -41,7 +41,7 @@ public class NodeConfiguration {
     public String nodeName;
     public String sqlDatabaseName;
 
-    public NodeConfiguration(String tables, String eventualTables, UUID mriIndex, String sqlDatabaseName, String node){
+    public NodeConfiguration(String tables, List<String> eventualTables, UUID mriIndex, String sqlDatabaseName, String node){
         //     public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) {
         partition = new DatabasePartition(toRanges(tables), mriIndex, null) ;
         eventual = new Eventual(toRanges(eventualTables));
@@ -49,16 +49,20 @@ public class NodeConfiguration {
         this.sqlDatabaseName = sqlDatabaseName;
     }
 
+    protected List<Range> toRanges(List<String> tables){
+        List<Range> newRange = new ArrayList<>();
+         for(String table: tables) {
+            newRange.add(new Range(table));
+        }
+        return newRange;
+    }
+
     protected List<Range> toRanges(String tables){
         if(tables.isEmpty()){
             return new ArrayList<>();
         }
-        List<Range> newRange = new ArrayList<>();
         String[] tablesArray=tables.split(",");
-        for(String table: tablesArray) {
-            newRange.add(new Range(table));
-        }
-        return newRange;
+        return toRanges(new ArrayList<>(Arrays.asList(tablesArray)));
     }
 
     public String toJson() {
index 343a8b8..0598271 100755 (executable)
@@ -26,6 +26,7 @@ import org.onap.music.mdbc.MDBCUtils;
 import org.onap.music.mdbc.Range;
 import org.onap.music.mdbc.mixins.MusicMixin;
 import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigestDaemon;
 
 import com.google.gson.Gson;
 import org.onap.music.datastore.PreparedQueryObject;
@@ -46,6 +47,7 @@ public class TablesConfiguration {
 
     private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TablesConfiguration.class);
     private List<PartitionInformation> partitions;
+    private List<String> eventual;
     String tableToPartitionName;
     private String musicNamespace;
     private String partitionInformationTableName;
@@ -83,16 +85,17 @@ public class TablesConfiguration {
 
             logger.info("Creating empty row with id "+partitionId);
             MusicMixin.createEmptyMriRow(musicNamespace,partitionInfo.mriTableName,UUID.fromString(partitionId),
-                partitionInfo.owner,null,partitionInfo.getTables(),true);
+                partitionInfo.owner,null,
+                partitionInfo.getTables(),true);
 
             //3) Create config for this node
             StringBuilder newStr = new StringBuilder();
             for(Range r: partitionInfo.tables){
-                newStr.append(r.toString().toUpperCase()).append(",");
+                newStr.append(r.toString()).append(",");
             }
 
             logger.info("Appending row to configuration "+partitionId);
-            nodeConfigs.add(new NodeConfiguration(newStr.toString(),"",UUID.fromString(partitionId),
+            nodeConfigs.add(new NodeConfiguration(newStr.toString(),eventual,UUID.fromString(partitionId),
                        sqlDatabaseName, partitionInfo.owner));
         }
         return nodeConfigs;
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/clusterConfiguration.json
new file mode 100644 (file)
index 0000000..430525f
--- /dev/null
@@ -0,0 +1,11 @@
+{
+  "internalNamespace": "music_internal",
+  "internalReplicationFactor": 1,
+  "musicNamespace": "namespace",
+  "musicReplicationFactor": 1,
+  "mriTableName": "musicrangeinformation",
+  "mtxdTableName": "musictxdigest",
+  "eventualMtxdTableName":"musicevetxdigest",
+  "nodeInfoTableName":"nodeinfo",
+  "rangeDependencyTableName":"musicrangedependency"
+}
index 8cbbfec..559d597 100755 (executable)
@@ -3,21 +3,19 @@
     {
       "tables": [
         {
-          "table": "PERSONS"
+          "table": "public.PERSONS"
         }
       ],
       "owner": "",
       "mriTableName": "musicrangeinformation",
-      "mtxdTableName": "musictxdigest",
       "partitionId": ""
     }
   ],
-  "internalNamespace": "music_internal",
-  "internalReplicationFactor": 1,
+  "eventual": ["public.eventualPERSONS"],
   "musicNamespace": "namespace",
-  "musicReplicationFactor": 1,
   "tableToPartitionName": "tabletopartition",
   "partitionInformationTableName": "partitioninfo",
   "redoHistoryTableName": "redohistory",
   "sqlDatabaseName": "test"
 }
+
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/AsyncUpdateHandler.java
new file mode 100644 (file)
index 0000000..543cbd0
--- /dev/null
@@ -0,0 +1,100 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc.mixins;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.websocket.RemoteEndpoint.Async;
+import org.onap.music.logging.EELFLoggerDelegate;
+
+public class AsyncUpdateHandler {
+
+    private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(AsyncUpdateHandler.class);
+
+    Object finishMonitor;
+    Object taskMonitor;
+    AtomicBoolean pendingUpdate;
+    Runnable handler;
+
+    AsyncUpdateHandler(Runnable handlerToRun){
+        handler=handlerToRun;
+        finishMonitor=new Object();
+        taskMonitor = new Object();
+        pendingUpdate=new AtomicBoolean(false);
+        createUpdateExecutor();
+    }
+
+    void createUpdateExecutor(){
+        Runnable newRunnable = ()-> UpdateExecutor();
+        Thread newThread = new Thread(newRunnable);
+        newThread.setDaemon(true);
+        newThread.start();
+    }
+
+    public void processNewUpdate(){
+        pendingUpdate.set(true);
+        synchronized (taskMonitor) {
+            taskMonitor.notifyAll();
+        }
+    }
+
+    void UpdateExecutor(){
+        while(true) {
+            synchronized (taskMonitor) {
+                try {
+                    if(!pendingUpdate.get()){
+                        taskMonitor.wait();
+                    }
+                } catch (InterruptedException e) {
+                    logger.error("Update Executor received an interrup exception");
+                }
+            }
+            startUpdate();
+        }
+    }
+
+    void startUpdate(){
+        synchronized (finishMonitor) {
+            //Keep running until there are no more requests
+            while (pendingUpdate.getAndSet(false)) {
+                if(handler!=null){
+                    handler.run();
+                }
+            }
+            finishMonitor.notifyAll();
+        }
+
+    }
+
+    public void waitForAllPendingUpdates(){
+        //Wait until there are no more requests and the thread is not longer running
+        //We could use a join, but given that the thread could be temporally null, then it is easier to
+        //separate the wait from the thread that is running
+        synchronized(finishMonitor){
+            while(pendingUpdate.get()) {
+                try {
+                    finishMonitor.wait();
+                } catch (InterruptedException e) {
+                    logger.error("waitForAllPendingUpdate received exception");
+                }
+            }
+        }
+    }
+}
index a594918..dd71d97 100755 (executable)
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import org.onap.music.exceptions.MDBCServiceException;
 import org.onap.music.mdbc.Range;
 import org.onap.music.mdbc.TableInfo;
@@ -111,6 +112,13 @@ public interface DBInterface {
         * @return a ResultSet containing the rows returned from the query
         */
        ResultSet executeSQLRead(String sql);
+
+       /**
+        * This method is used to verify that all data structures needed for commit are ready for commit
+        * Either for the MusicMixin (e.g. staging table) or internally for the DBMixin
+        * Important this is not for actual commit operation, it should be treated as a signal.
+        */
+       void preCommitHook();
        
        void synchronizeData(String tableName);
        
@@ -133,4 +141,6 @@ public interface DBInterface {
        void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException;
 
        Connection getSQLConnection();
+
+       String getSchema();
 }
index d822615..324b0f6 100755 (executable)
@@ -61,12 +61,13 @@ public class MixinFactory {
                                                con = cl.getConstructor(MusicInterface.class, String.class, Connection.class, Properties.class);
                                                if (con != null) {
                                                        logger.info(EELFLoggerDelegate.applicationLogger,"Found match: "+miname);
-                                                       return (DBInterface) con.newInstance(mi, url, conn, info);
+                                                       DBInterface newdbi = (DBInterface) con.newInstance(mi, url, conn, info);
+                                                       return newdbi;
                                                }
                                        }
                                }
                        } catch (Exception e) {
-                               logger.error(EELFLoggerDelegate.errorLogger,"createDBInterface: "+e);
+                               logger.error(EELFLoggerDelegate.errorLogger,"createDBInterface error for "+cl.getName(),e);
                        }
                }
                return null;
index 71f1b8b..fc8897f 100755 (executable)
@@ -303,8 +303,8 @@ public interface MusicInterface {
     void deleteOldMriRows(Map<UUID,String> oldRowsAndLocks) throws MDBCServiceException;
 
     List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException;
-    
 
+    
     void deleteMriRow(MusicRangeInformationRow row) throws MDBCServiceException;
 
     void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException;
index cb37a55..5a322f3 100644 (file)
@@ -61,7 +61,6 @@ import org.onap.music.mdbc.StateManager;
 import org.onap.music.mdbc.TableInfo;
 import org.onap.music.mdbc.ownership.Dag;
 import org.onap.music.mdbc.ownership.DagNode;
-import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
 import org.onap.music.mdbc.tables.MriReference;
 import org.onap.music.mdbc.tables.MusicRangeInformationRow;
 import org.onap.music.mdbc.tables.MusicTxDigestId;
@@ -99,6 +98,8 @@ public class MusicMixin implements MusicInterface {
     public static final String KEY_MUSIC_RFACTOR      = "music_rfactor";
     /** The property name to use to provide the replication factor for Cassandra. */
     public static final String KEY_MUSIC_NAMESPACE = "music_namespace";
+    /**  The property name to use to provide a timeout to mdbc (ownership) */
+    public static final String KEY_TIMEOUT = "mdbc_timeout";
     /**  The property name to use to provide a flag indicating if compression is required */
     public static final String KEY_COMPRESSION = "mdbc_compression";
     /** Namespace for the tables in MUSIC (Cassandra) */
@@ -106,13 +107,15 @@ public class MusicMixin implements MusicInterface {
     /** The default property value to use for the Cassandra IP address. */
     public static final String DEFAULT_MUSIC_ADDRESS  = "localhost";
     /** The default property value to use for the Cassandra replication factor. */
-    public static final int    DEFAULT_MUSIC_RFACTOR  = 1;
+    public static final int    DEFAULT_MUSIC_RFACTOR  = 3;
+    /** The default property value to use for the MDBC timeout */
+    public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours
     /** The default primary string column, if none is provided. */
     public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid";
     /** Type of the primary key, if none is defined by the user */
     public static final String MDBC_PRIMARYKEY_TYPE = "uuid";
     public static final boolean DEFAULT_COMPRESSION = true;
-
+    //TODO: Control network topology strategy with a configuration file entry
     public static final boolean ENABLE_NETWORK_TOPOLOGY_STRATEGY = false;
 
     //\TODO Add logic to change the names when required and create the tables when necessary
@@ -254,6 +257,7 @@ public class MusicMixin implements MusicInterface {
     public static void createKeyspace(String keyspace, int replicationFactor) throws MDBCServiceException {
         Map<String,Object> replicationInfo = new HashMap<>();
         replicationInfo.put("'class'", "'NetworkTopologyStrategy'");
+
         if (ENABLE_NETWORK_TOPOLOGY_STRATEGY && replicationFactor==3) {
             replicationInfo.put("'dc1'", 1);
             replicationInfo.put("'dc2'", 1);
@@ -673,7 +677,7 @@ public class MusicMixin implements MusicInterface {
         }
         if(rs!=null) {
             for (Row row : rs) {
-                set.add(row.getString("TABLE_NAME").toUpperCase());
+                set.add(row.getString("TABLE_NAME"));
             }
         }
         return set;
@@ -1295,15 +1299,13 @@ public class MusicMixin implements MusicInterface {
             logger.warn("Trying tcommit log with null partition");
             return;
         }
+
         List<Range> snapshot = partition.getSnapshot();
         if(snapshot==null || snapshot.isEmpty()){
             logger.warn("Trying to commit log with empty ranges");
             return;
         }
 
-        // first deal with commit for eventually consistent tables
-        filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
         UUID mriIndex = partition.getMRIIndex();
         String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex;
         //0. See if reference to lock was already created
@@ -1475,7 +1477,7 @@ public class MusicMixin implements MusicInterface {
         for(TupleValue t: log){
             //final String tableName = t.getString(0);
             final UUID id = t.getUUID(1);
-            digestIds.add(new MusicTxDigestId(id,index++));
+            digestIds.add(new MusicTxDigestId(partitionIndex,id,index++));
         }
         List<Range> partitions = new ArrayList<>();
         Set<String> tables = newRow.getSet("keys",String.class);
@@ -1526,11 +1528,14 @@ public class MusicMixin implements MusicInterface {
         pQueryObject.appendQueryString(cql);
         pQueryObject.addValue(baseRange.getTable());
         Row newRow;
+        //TODO Change this when music fix the "." problem in the primary key
+        final String table = baseRange.getTable();
+        final String tableWithoutDot = table.replaceAll("\\.","");
         try {
-            newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,baseRange.getTable(),null);
+            newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,tableWithoutDot,null);
         } catch (MDBCServiceException e) {
-            logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
-            throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e);
+            logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName+" trying for table "+tableWithoutDot);
+            throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information for table "+tableWithoutDot, e);
         }
         return getRangeDependenciesFromCassandraRow(newRow);
     }
@@ -1838,7 +1843,6 @@ public class MusicMixin implements MusicInterface {
         PreparedQueryObject query = new PreparedQueryObject();
         int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR);
         
-        
         String cql = String.format("INSERT INTO %s.%s (txid,transactiondigest,compressed,year,txTimeId ) VALUES (?,?,?,?,now());",this.music_ns,
                 this.musicEventualTxDigestTableName);
             query.appendQueryString(cql);
@@ -1856,6 +1860,7 @@ public class MusicMixin implements MusicInterface {
         }
     }
 
+
     @Override
     public StagingTable getTxDigest(MusicTxDigestId id) throws MDBCServiceException {
         String cql = String.format("SELECT * FROM %s.%s WHERE txid = ?;", music_ns, musicTxDigestTableName);
@@ -1883,8 +1888,7 @@ public class MusicMixin implements MusicInterface {
         }
         return changes;
     }
-    
-    
+
     @Override
     public LinkedHashMap<UUID, StagingTable> getEveTxDigest(String nodeName) throws MDBCServiceException {
         int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR);
@@ -1901,7 +1905,7 @@ public class MusicMixin implements MusicInterface {
         LinkedHashMap<UUID, StagingTable> ecDigestInformation = new LinkedHashMap<>();
         UUID musicevetxdigestNodeinfoTimeID = getTxTimeIdFromNodeInfo(nodeName);
         PreparedQueryObject pQueryObject = new PreparedQueryObject();
-        
+
         if (musicevetxdigestNodeinfoTimeID != null) {
             // this will fetch only few records based on the time-stamp condition.
             cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) AND txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString());
@@ -1912,7 +1916,7 @@ public class MusicMixin implements MusicInterface {
             cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString());
             pQueryObject.appendQueryString(cql);
         }
-        
+
         // I need to get a ResultSet of all the records and give each row to the below HashMap.
         ResultSet rs = executeMusicRead(pQueryObject);
         while (!rs.isExhausted()) {
@@ -1920,8 +1924,8 @@ public class MusicMixin implements MusicInterface {
             ByteBuffer digest = row.getBytes("transactiondigest");
             Boolean compressed = row.getBool("compressed");
             //String txTimeId = row.getString("txtimeid"); //???
-            UUID txTimeId = row.getUUID("txtimeid");    
-            
+            UUID txTimeId = row.getUUID("txtimeid");
+
             try {
                 if(compressed){
                     digest=StagingTable.Decompress(digest);
@@ -1932,7 +1936,7 @@ public class MusicMixin implements MusicInterface {
                 throw e;
             }
             ecDigestInformation.put(txTimeId, changes);
-        }      
+        }
         return ecDigestInformation;
     }
 
@@ -2218,7 +2222,7 @@ public class MusicMixin implements MusicInterface {
      * @param cql the CQL to be sent to Cassandra
      */
     private static void executeMusicWriteQuery(String keyspace, String table, String cql)
-        throws MDBCServiceException {
+            throws MDBCServiceException {
         PreparedQueryObject pQueryObject = new PreparedQueryObject();
         pQueryObject.appendQueryString(cql);
         ResultType rt = null;
@@ -2254,6 +2258,9 @@ public class MusicMixin implements MusicInterface {
                 throw new MDBCServiceException("Error executing atomic get", e);
             }
         }
+        if(result==null){
+            throw new MDBCServiceException("Error executing atomic get for primary key: "+primaryKey);
+        }
         if(result.isExhausted()){
             return null;
         }
@@ -2418,11 +2425,17 @@ public class MusicMixin implements MusicInterface {
         pQueryObject.addValue(row.getPartitionIndex());
         ReturnType rt ;
         try {
-            rt = MusicCore.atomicPut(music_ns, musicRangeDependencyTableName, row.getPartitionIndex().toString(),
+            rt = MusicCore.atomicPut(music_ns, musicRangeInformationTableName, row.getPartitionIndex().toString(),
                 pQueryObject, null);
         } catch (MusicLockingException|MusicQueryException|MusicServiceException e) {
             logger.error("Failure when deleting mri row");
             new MDBCServiceException("Error deleting mri row",e);
         }
     }
+
+       public StateManager getStateManager() {
+               return stateManager;
+       }
+    
+    
 }
index 6d6e691..15caf3f 100755 (executable)
@@ -40,6 +40,7 @@ import org.json.JSONTokener;
 
 import org.onap.music.exceptions.MDBCServiceException;
 import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.Configuration;
 import org.onap.music.mdbc.MDBCUtils;
 import org.onap.music.mdbc.Range;
 import org.onap.music.mdbc.TableInfo;
@@ -75,7 +76,7 @@ public class MySQLMixin implements DBInterface {
        public static final String TRANS_TBL = "MDBC_TRANSLOG";
        private static final String CREATE_TBL_SQL =
                "CREATE TABLE IF NOT EXISTS "+TRANS_TBL+
-               " (IX INT AUTO_INCREMENT, OP CHAR(1), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA VARCHAR(1024), " +
+               " (IX INT AUTO_INCREMENT, OP CHAR(1), SCHEMANAME VARCHAR(255), TABLENAME VARCHAR(255),KEYDATA VARCHAR(1024), ROWDATA VARCHAR(1024), " +
                        "CONNECTION_ID INT, PRIMARY KEY (IX));";
 
        private final MusicInterface mi;
@@ -85,6 +86,10 @@ public class MySQLMixin implements DBInterface {
        private final Map<String, TableInfo> tables;
        private PreparedStatement deleteStagingStatement;
        private boolean server_tbl_created = false;
+       private boolean useAsyncStagingUpdate = false;
+       private Object stagingHandlerLock = new Object();
+       private AsyncUpdateHandler stagingHandler = null;
+       private StagingTable currentStaging=null;
 
        public MySQLMixin() {
                this.mi = null;
@@ -100,12 +105,35 @@ public class MySQLMixin implements DBInterface {
                this.dbName = getDBName(conn);
                this.jdbcConn = conn;
                this.tables = new HashMap<String, TableInfo>();
+               useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE,
+                                                                                                                               Configuration.ASYNC_STAGING_TABLE_UPDATE));
+       }
+
+       class StagingTableUpdateRunnable implements Runnable{
+
+               private MySQLMixin mixin;
+               private StagingTable staging;
+
+           StagingTableUpdateRunnable(MySQLMixin mixin, StagingTable staging){
+               this.mixin=mixin;
+               this.staging=staging;
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               this.mixin.updateStagingTable(staging);
+                       } catch (NoSuchFieldException|MDBCServiceException e) {
+                           this.mixin.logger.error("Error when updating the staging table");
+                       }
+               }
        }
 
        private PreparedStatement getStagingDeletePreparedStatement() throws SQLException {
                return jdbcConn.prepareStatement("DELETE FROM "+TRANS_TBL+" WHERE (IX BETWEEN ? AND ? ) AND " +
                                        "CONNECTION_ID = ?;");
        }
+
        // This is used to generate a unique connId for this connection to the DB.
        private int generateConnID(Connection conn) {
                int rv = (int) System.currentTimeMillis();      // random-ish
@@ -156,11 +184,19 @@ public class MySQLMixin implements DBInterface {
                }
                return dbname;
        }
-       
+
+       @Override
        public String getDatabaseName() {
                return this.dbName;
        }
 
+       @Override
+       public String getSchema() {return this.dbName;}
+
+       /**
+        * Get a set of the table names in the database.
+        * @return the set
+        */
        @Override
        public Set<String> getSQLTableSet() {
                Set<String> set = new TreeSet<String>();
@@ -183,7 +219,7 @@ public class MySQLMixin implements DBInterface {
        @Override
     public Set<Range> getSQLRangeSet() {
         Set<String> set = new TreeSet<String>();
-        String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
+        String sql = "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
         try {
             Statement stmt = jdbcConn.createStatement();
             ResultSet rs = stmt.executeQuery(sql);
@@ -244,9 +280,19 @@ mysql> describe tables;
                TableInfo ti = tables.get(tableName);
                if (ti == null) {
                        try {
-                               String tbl = tableName;//.toUpperCase();
-                               String sql = "SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=DATABASE() AND TABLE_NAME='"+tbl+"'";
-                               ResultSet rs = executeSQLRead(sql);
+                               final String[] split = tableName.split("\\.");
+                               String tbl = (split.length==2)?split[1]:tableName;
+                               String localSchema = (split.length==2)?split[0]:getSchema();
+                               StringBuilder sql=new StringBuilder();
+                               sql.append("SELECT COLUMN_NAME, DATA_TYPE, COLUMN_KEY FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=");
+                               if(localSchema==null) {
+                                       sql.append("DATABASE() AND TABLE_NAME='");
+                               }
+                               else {
+                                       sql.append("'").append(localSchema).append("' AND TABLE_NAME='");
+                               }
+                               sql.append(tbl).append("';");
+                               ResultSet rs = executeSQLRead(sql.toString());
                                if (rs != null) {
                                        ti = new TableInfo();
                                        while (rs.next()) {
@@ -296,9 +342,13 @@ mysql> describe tables;
                }
        }
        @Override
-       public void createSQLTriggers(String tableName) {
-               // Don't create triggers for the table the triggers write into!!!
+       public void createSQLTriggers(String table) {
+               final String[] split = table.split("\\.");
+               String schemaName = (split.length==2)?split[0]:getSchema();
+               String tableName = (split.length==2)?split[1]:table;
+
                if (tableName.equals(TRANS_TBL))
+                       // Don't create triggers for the table the triggers write into!!!
                        return;
                try {
                        if (!server_tbl_created) {
@@ -321,12 +371,12 @@ mysql> describe tables;
                                //msm.register(name);
                        }
                        // No SELECT trigger
-                       executeSQLWrite(generateTrigger(tableName, "INSERT"));
-                       executeSQLWrite(generateTrigger(tableName, "UPDATE"));
+                       executeSQLWrite(generateTrigger(schemaName,tableName, "INSERT"));
+                       executeSQLWrite(generateTrigger(schemaName,tableName, "UPDATE"));
                        //\TODO: save key row instead of the whole row for delete
-                       executeSQLWrite(generateTrigger(tableName, "DELETE"));
+                       executeSQLWrite(generateTrigger(schemaName,tableName, "DELETE"));
                } catch (SQLException e) {
-                       if (e.getMessage().equals("Trigger already exists"){
+                       if (e.getMessage().equals("Trigger already exists") || e.getMessage().endsWith("already exists")){
                                //only warn if trigger already exists
                                logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e);
                        } else {
@@ -343,7 +393,7 @@ END;
 OLD.field refers to the old value
 NEW.field refers to the new value
 */
-       private String generateTrigger(String tableName, String op) {
+       private String generateTrigger(String schema, String tableName, String op) {
                boolean isdelete = op.equals("DELETE");
                boolean isinsert = op.equals("INSERT");
                boolean isupdate = op.equals("UPDATE");
@@ -371,25 +421,27 @@ NEW.field refers to the new value
                //\TODO check if using mysql driver, so instead check the exception
         //\TODO add conditional for update, if primary key is still the same, use null in the KEYDATA col
                StringBuilder sb = new StringBuilder()
-                 .append("CREATE TRIGGER ")            // IF NOT EXISTS not supported by MySQL!
-                 .append(String.format("%s_%s", op.substring(0, 1), tableName))
-                 .append(" AFTER ")
-                 .append(op)
-                 .append(" ON ")
-                 .append(tableName.toUpperCase())
-                 .append(" FOR EACH ROW INSERT INTO ")
-                 .append(TRANS_TBL)
-                 .append(" (TABLENAME, OP, KEYDATA, ROWDATA, CONNECTION_ID) VALUES('")
-                 .append(tableName.toUpperCase())
-                 .append("', ")
-                 .append(isdelete ? "'D'" : (op.equals("INSERT") ? "'I'" : "'U'"))
-                 .append(", ")
-                 .append( (keyJson.length()>"JSON_OBJECT()".length()) ? keyJson.toString() : "NULL")
-                 .append(", ")
-                 .append(newJson.toString())
-                 .append(", ")
-                 .append("CONNECTION_ID()")
-                 .append(")");
+                       .append("CREATE TRIGGER ")              // IF NOT EXISTS not supported by MySQL!
+                       .append(String.format("%s_%s", op.substring(0, 1), tableName))
+                       .append(" AFTER ")
+                       .append(op)
+                       .append(" ON ")
+                       .append(tableName)
+                       .append(" FOR EACH ROW INSERT INTO ")
+                       .append(TRANS_TBL)
+                       .append(" (SCHEMANAME, TABLENAME, OP, KEYDATA, ROWDATA, CONNECTION_ID) VALUES('")
+                       .append( (schema==null)?this.getSchema():schema )
+                       .append("', '")
+                       .append(tableName)
+                       .append("', ")
+                       .append(isdelete ? "'D'" : (op.equals("INSERT") ? "'I'" : "'U'"))
+                       .append(", ")
+                       .append( (keyJson.length()>"JSON_OBJECT()".length()) ? keyJson.toString() : "NULL")
+                       .append(", ")
+                       .append(newJson.toString())
+                       .append(", ")
+                       .append("CONNECTION_ID()")
+                       .append(")");
                return sb.toString();
        }
        private String[] getTriggerNames(String tableName) {
@@ -521,6 +573,16 @@ NEW.field refers to the new value
                return rs;
        }
 
+       @Override
+       public void preCommitHook() {
+               synchronized (stagingHandlerLock){
+                   //\TODO check if this can potentially block forever in certain scenarios
+                       if(stagingHandler!=null){
+                               stagingHandler.waitForAllPendingUpdates();
+                       }
+               }
+       }
+
        /**
         * This method executes a write query in the sql database.
         * @param sql the SQL to be sent to MySQL
@@ -569,11 +631,24 @@ NEW.field refers to the new value
                        String[] parts = sql.trim().split(" ");
                        String cmd = parts[0].toLowerCase();
                        if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) {
-                               try {
-                                       this.updateStagingTable(transactionDigest);
-                               } catch (NoSuchFieldException|MDBCServiceException e) {
-                                       // TODO Auto-generated catch block
-                                       e.printStackTrace();
+                               if (useAsyncStagingUpdate) {
+                                       synchronized (stagingHandlerLock){
+                                               if(stagingHandler==null||currentStaging!=transactionDigest){
+                                                       Runnable newRunnable = new StagingTableUpdateRunnable(this, transactionDigest);
+                                                       currentStaging=transactionDigest;
+                                                       stagingHandler=new AsyncUpdateHandler(newRunnable);
+                                               }
+                                               //else we can keep using the current staging Handler
+                                       }
+                                       stagingHandler.processNewUpdate();
+                               } else {
+
+                                       try {
+                                               this.updateStagingTable(transactionDigest);
+                                       } catch (NoSuchFieldException | MDBCServiceException e) {
+                                               // TODO Auto-generated catch block
+                                               this.logger.error("Error updating the staging table");
+                                       }
                                }
                        }
                }
@@ -604,7 +679,7 @@ NEW.field refers to the new value
                throws NoSuchFieldException, MDBCServiceException {
                // copy from DB.MDBC_TRANSLOG where connid == myconnid
                // then delete from MDBC_TRANSLOG
-               String sql2 = "SELECT IX, TABLENAME, OP, ROWDATA,KEYDATA FROM "+TRANS_TBL +" WHERE CONNECTION_ID = " + this.connId;
+               String sql2 = "SELECT IX, SCHEMANAME, TABLENAME, OP, ROWDATA, KEYDATA FROM " + TRANS_TBL +" WHERE CONNECTION_ID = " + this.connId;
                Integer biggestIx = Integer.MIN_VALUE;
                Integer smallestIx = Integer.MAX_VALUE;
                try {
@@ -616,10 +691,11 @@ NEW.field refers to the new value
                                smallestIx = Integer.min(smallestIx,ix);
                                String op   = rs.getString("OP");
                                SQLOperation opType = toOpEnum(op);
+                               String schema= rs.getString("SCHEMANAME");
                                String tbl  = rs.getString("TABLENAME");
                                String newRowStr = rs.getString("ROWDATA");
                                String rowStr = rs.getString("KEYDATA");
-                               Range range = new Range(tbl);
+                               Range range = new Range(schema+"."+tbl);
                                transactionDigests.addOperation(range,opType,newRowStr,rowStr);
                                rows.add(ix);
                        }
@@ -1056,7 +1132,7 @@ NEW.field refers to the new value
        private void clearReplayedOperations(Statement jdbcStmt) throws SQLException {
                logger.info("Clearing replayed operations");
                String sql = "DELETE FROM " + TRANS_TBL + " WHERE CONNECTION_ID = " + this.connId; 
-               jdbcStmt.executeQuery(sql);
+               jdbcStmt.executeUpdate(sql);
        }
 
        @Override
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
new file mode 100755 (executable)
index 0000000..0f66731
--- /dev/null
@@ -0,0 +1,1066 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+package org.onap.music.mdbc.mixins;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.*;
+import java.util.Map.Entry;
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.delete.Delete;
+import net.sf.jsqlparser.statement.insert.Insert;
+import net.sf.jsqlparser.statement.update.Update;
+import org.json.JSONObject;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.Configuration;
+import org.onap.music.mdbc.MDBCUtils;
+import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.TableInfo;
+import org.onap.music.mdbc.mixins.MySQLMixin.StagingTableUpdateRunnable;
+import org.onap.music.mdbc.tables.Operation;
+import org.onap.music.mdbc.query.SQLOperation;
+import org.onap.music.mdbc.tables.StagingTable;
+import org.postgresql.util.PGInterval;
+import org.postgresql.util.PGobject;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+/**
+ * This class provides the methods that MDBC needs in order to mirror data to/from a
+ * <a href="https://dev.mysql.com/">MySQL</a> or <a href="http://mariadb.org/">MariaDB</a> database instance. This class
+ * uses the <code>JSON_OBJECT()</code> database function, which means it requires the following minimum versions of
+ * either database:
+ * <table summary="">
+ * <tr>
+ * <th>DATABASE</th>
+ * <th>VERSION</th>
+ * </tr>
+ * <tr>
+ * <td>MySQL</td>
+ * <td>5.7.8</td>
+ * </tr>
+ * <tr>
+ * <td>MariaDB</td>
+ * <td>10.2.3 (Note: 10.2.3 is currently (July 2017) a <i>beta</i> release)</td>
+ * </tr>
+ * </table>
+ *
+ * @author Robert P. Eby
+ */
+public class PostgresMixin implements DBInterface {
+
+    private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(PostgresMixin.class);
+
+    public static final String MIXIN_NAME = "postgres";
+    public static final String TRANS_TBL_SCHEMA = "audit";
+    public static final String TRANS_TBL = "mdbc_translog";
+
+    private final MusicInterface mi;
+    private final String connId;
+    private final String dbName;
+    private final String schema;
+    private final Connection jdbcConn;
+    private final Map<String, TableInfo> tables;
+    private PreparedStatement deleteStagingStatement;
+    private boolean useAsyncStagingUpdate = false;
+    private Object stagingHandlerLock = new Object();
+    private AsyncUpdateHandler stagingHandler = null;
+    private StagingTable currentStaging = null;
+
+    public PostgresMixin() {
+        this.mi = null;
+        this.connId = "";
+        this.dbName = null;
+        this.schema = null;
+        this.jdbcConn = null;
+        this.tables = null;
+        this.deleteStagingStatement = null;
+    }
+
+    private void initializeDeleteStatement() throws SQLException {
+        deleteStagingStatement = jdbcConn.prepareStatement("DELETE FROM " + TRANS_TBL_SCHEMA + "." + TRANS_TBL
+                + " WHERE (ix BETWEEN ? AND ? ) AND " + "connection_id = ?;");
+    }
+
+    public PostgresMixin(MusicInterface mi, String url, Connection conn, Properties info) throws SQLException {
+        this.mi = mi;
+        this.connId = generateConnID(conn);
+        this.dbName = getDBName(conn);
+        this.schema = getSchema(conn);
+        this.jdbcConn = conn;
+        this.tables = new HashMap<>();
+        useAsyncStagingUpdate = Boolean.parseBoolean(info.getProperty(Configuration.KEY_ASYNC_STAGING_TABLE_UPDATE,
+                Configuration.ASYNC_STAGING_TABLE_UPDATE));
+        initializePostgresTriggersStructures();
+        initializeDeleteStatement();
+    }
+
+    class StagingTableUpdateRunnable implements Runnable {
+
+        private PostgresMixin mixin;
+        private StagingTable staging;
+
+        StagingTableUpdateRunnable(PostgresMixin mixin, StagingTable staging) {
+            this.mixin = mixin;
+            this.staging = staging;
+        }
+
+        @Override
+        public void run() {
+            try {
+                this.mixin.updateStagingTable(staging);
+            } catch (NoSuchFieldException | MDBCServiceException e) {
+                this.mixin.logger.error("Error when updating the staging table");
+            }
+        }
+    }
+
+
+    private void createTriggerTable() throws SQLException {
+        final String createSchemaSQL = "CREATE SCHEMA IF NOT EXISTS " + TRANS_TBL_SCHEMA + ";";
+        final String revokeCreatePrivilegesSQL = "REVOKE CREATE ON schema " + TRANS_TBL_SCHEMA + " FROM public;";
+        final String createTableSQL = "CREATE TABLE IF NOT EXISTS " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " ("
+                + "ix serial," + "op TEXT NOT NULL CHECK (op IN ('I','D','U'))," + "schema_name text NOT NULL,"
+                + "table_name text NOT NULL," + "original_data json," + "new_data json," + "connection_id text,"
+                + "PRIMARY KEY (connection_id,ix)" + ") WITH (fillfactor=100);";
+        final String revokeSQL = "REVOKE INSERT,UPDATE,DELETE,TRUNCATE,REFERENCES,TRIGGER ON " + TRANS_TBL_SCHEMA + "."
+                + TRANS_TBL + " FROM public;";
+        final String grantSelectSQL = "GRANT SELECT ON " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " TO public;";
+        final String createIndexSQL = "CREATE INDEX IF NOT EXISTS logged_actions_connection_id_idx" + " ON "
+                + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " (connection_id);";
+        Map<String, String> sqlStatements = new LinkedHashMap<String, String>() {
+            {
+                put("create_schema", createSchemaSQL);
+                put("revoke_privileges", revokeCreatePrivilegesSQL);
+                put("create_table", createTableSQL);
+                put("revoke_sql", revokeSQL);
+                put("grant_select", grantSelectSQL);
+                put("create_index", createIndexSQL);
+            }
+        };
+        for (Entry<String, String> query : sqlStatements.entrySet()) {
+            int retryCount = 0;
+            boolean ready = false;
+            while (retryCount < 3 && !ready) {
+                try {
+                    Statement statement = jdbcConn.createStatement();
+                    statement.executeUpdate(query.getValue());
+                    if (!jdbcConn.getAutoCommit()) {
+                        jdbcConn.commit();
+                    }
+                    statement.close();
+                    ready = true;
+                } catch (SQLException e) {
+                    if (e.getMessage().equalsIgnoreCase("ERROR: tuple concurrently updated") || e.getMessage()
+                            .toLowerCase().startsWith("error: duplicate key value violates unique constraint")) {
+                        logger.warn("Error creating schema, retrying. for " + query.getKey(), e);
+                        try {
+                            Thread.sleep(100);
+                        } catch (InterruptedException e1) {
+                        }
+                    } else {
+                        logger.error("Error executing " + query.getKey(), e);
+                        throw e;
+                    }
+                }
+                retryCount++;
+            }
+        }
+    }
+
+    private String updateTriggerSection() {
+        return "IF (TG_OP = 'UPDATE') THEN\n" + "v_old_data := row_to_json(OLD);\n"
+                + "v_new_data := row_to_json(NEW);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA + "." + TRANS_TBL
+                + " (op,schema_name,table_name,original_data,new_data,connection_id)\n"
+                + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_old_data,v_new_data,pg_backend_pid());\n"
+                + "RETURN NEW; ";
+    }
+
+    private String insertTriggerSection() {
+        // \TODO add additinoal conditional on change "IF NEW IS DISTINCT FROM OLD THEN"
+        return "IF (TG_OP = 'INSERT') THEN\n" + "v_new_data := row_to_json(NEW);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA
+                + "." + TRANS_TBL + " (op,schema_name,table_name,new_data,connection_id)\n"
+                + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_new_data,pg_backend_pid());\n"
+                + "RETURN NEW; ";
+    }
+
+    private String deleteTriggerSection() {
+        return "IF (TG_OP = 'DELETE') THEN\n" + "v_old_data := row_to_json(OLD);\n" + "INSERT INTO " + TRANS_TBL_SCHEMA
+                + "." + TRANS_TBL + " (op,schema_name,table_name,original_data,connection_id)\n"
+                + "VALUES (substring(TG_OP,1,1),TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,v_old_data,pg_backend_pid());\n"
+                + "RETURN OLD; ";
+    }
+
+    private String functionName(SQLOperation type) {
+        final String functionName = (type.equals(SQLOperation.UPDATE)) ? "if_updated_func"
+                : (type.equals(SQLOperation.INSERT)) ? "if_inserted_func" : "if_deleted_func";
+        return "audit." + functionName + "()";
+    }
+
+    private void createTriggerFunctions(SQLOperation type) throws SQLException {
+        StringBuilder functionSQL =
+                new StringBuilder("CREATE OR REPLACE FUNCTION " + functionName(type) + " RETURNS TRIGGER AS $body$\n"
+                        + "DECLARE\n" + "v_old_data json;\n" + "v_new_data json;\n" + "BEGIN\n");
+        switch (type) {
+            case UPDATE:
+                functionSQL.append(updateTriggerSection());
+                break;
+            case INSERT:
+                functionSQL.append(insertTriggerSection());
+                break;
+            case DELETE:
+                functionSQL.append(deleteTriggerSection());
+                break;
+            default:
+                throw new IllegalArgumentException("Invalid operation type for creation of trigger functions");
+        }
+        functionSQL.append("ELSE\n"
+                + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - Other action occurred: %, at %',TG_OP,now();\n"
+                + "RETURN NULL;\n" + "END IF;\n" + "EXCEPTION\n" + "WHEN data_exception THEN\n"
+                + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [DATA EXCEPTION] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n"
+                + "RETURN NULL;\n" + "WHEN unique_violation THEN\n"
+                + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [UNIQUE] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n"
+                + "RETURN NULL;\n" + "WHEN OTHERS THEN\n"
+                + "RAISE WARNING '[AUDIT.IF_MODIFIED_FUNC] - UDF ERROR [OTHER] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;\n"
+                + "RETURN NULL;\n" + "END;\n" + "$body$\n" + "LANGUAGE plpgsql\n" + "SECURITY DEFINER\n"
+                + "SET search_path = pg_catalog, audit;");
+        int retryCount = 0;
+        boolean ready = false;
+        while (retryCount < 3 && !ready) {
+            try {
+                executeSQLWrite(functionSQL.toString());
+                ready = true;
+            } catch (SQLException e) {
+                if (e.getMessage().equalsIgnoreCase("ERROR: tuple concurrently updated")) {
+                    logger.warn("Error creating schema, retrying. ", e);
+                    try {
+                        Thread.sleep(200);
+                    } catch (InterruptedException e1) {
+                    }
+                } else {
+                    logger.error("Error executing creation of trigger function", e);
+                    throw e;
+                }
+            }
+            retryCount++;
+        }
+    }
+
+    private void initializePostgresTriggersStructures() throws SQLException {
+        try {
+            createTriggerTable();
+        } catch (SQLException e) {
+            logger.error("Error creating the trigger tables in postgres", e);
+            throw e;
+        }
+        try {
+            createTriggerFunctions(SQLOperation.INSERT);
+            createTriggerFunctions(SQLOperation.UPDATE);
+            createTriggerFunctions(SQLOperation.DELETE);
+        } catch (SQLException e) {
+            logger.error("Error creating the trigger functions in postgres", e);
+            throw e;
+        }
+    }
+
+    // This is used to generate a unique connId for this connection to the DB.
+    private String generateConnID(Connection conn) {
+        String rv = Integer.toString((int) System.currentTimeMillis()); // random-ish
+        try {
+            Statement stmt = conn.createStatement();
+            ResultSet rs = stmt.executeQuery("SELECT pg_backend_pid() AS IX");
+            if (rs.next()) {
+                rv = rs.getString("IX");
+            }
+            stmt.close();
+        } catch (SQLException e) {
+            logger.error(EELFLoggerDelegate.errorLogger, "generateConnID: problem generating a connection ID!");
+        }
+        return rv;
+    }
+
+    /**
+     * Get the name of this DBnterface mixin object.
+     * 
+     * @return the name
+     */
+    @Override
+    public String getMixinName() {
+        return MIXIN_NAME;
+    }
+
+    @Override
+    public void close() {
+        // nothing yet
+    }
+
+    /**
+     * Determines the db name associated with the connection This is the private/internal method that actually
+     * determines the name
+     * 
+     * @param conn
+     * @return
+     */
+    private String getDBName(Connection conn) {
+        String dbname = "mdbc"; // default name
+        try {
+            Statement stmt = conn.createStatement();
+            ResultSet rs = stmt.executeQuery("SELECT current_database();");
+            if (rs.next()) {
+                dbname = rs.getString("current_database");
+            }
+            stmt.close();
+        } catch (SQLException e) {
+            logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql");
+        }
+        return dbname;
+    }
+
+    /**
+     * Determines the db name associated with the connection This is the private/internal method that actually
+     * determines the name
+     * 
+     * @param conn
+     * @return
+     */
+    private String getSchema(Connection conn) {
+        String schema = "public"; // default name
+        try {
+            Statement stmt = conn.createStatement();
+            ResultSet rs = stmt.executeQuery("SELECT current_schema();");
+            if (rs.next()) {
+                schema = rs.getString("current_schema");
+            }
+            stmt.close();
+        } catch (SQLException e) {
+            logger.error(EELFLoggerDelegate.errorLogger, "getDBName: problem getting database name from mysql");
+        }
+        return schema;
+    }
+
+    @Override
+    public String getSchema() {
+        return schema;
+    }
+
+    @Override
+    public String getDatabaseName() {
+        return this.dbName;
+    }
+
+    /**
+     * Get a set of the table names in the database.
+     * 
+     * @return the set
+     */
+    @Override
+    public Set<Range> getSQLRangeSet() {
+        Set<String> set = new TreeSet<String>();
+        String sql =
+                "SELECT table_name FROM information_schema.tables WHERE table_type='BASE TABLE' AND table_schema=current_schema();";
+        try {
+            Statement stmt = jdbcConn.createStatement();
+            ResultSet rs = stmt.executeQuery(sql);
+            while (rs.next()) {
+                String s = rs.getString("table_name");
+                set.add(s);
+            }
+            stmt.close();
+        } catch (SQLException e) {
+            logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e);
+        }
+        logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set);
+        Set<Range> rangeSet = new HashSet<>();
+        for (String table : set) {
+            rangeSet.add(new Range(table));
+        }
+        return rangeSet;
+    }
+
+    /**
+     * Return a TableInfo object for the specified table. This method first looks in a cache of previously constructed
+     * TableInfo objects for the table. If not found, it queries the INFORMATION_SCHEMA.COLUMNS table to obtain the
+     * column names, types, and indexes of the table. It creates a new TableInfo object with the results.
+     * 
+     * @param tableName the table to look up
+     * @return a TableInfo object containing the info we need, or null if the table does not exist
+     */
+    @Override
+    public TableInfo getTableInfo(String tableName) {
+        // \TODO: invalidate "tables" when a table schema is modified (uncommon), but needs to be handled
+        TableInfo ti = tables.get(tableName);
+        if (ti == null) {
+            try {
+                String tbl, localSchema;
+                final String[] split = tableName.split("\\.");
+                if (split.length == 2) {
+                    localSchema = split[0];
+                    tbl = split[1];
+                } else {
+                    tbl = tableName;
+                    localSchema = this.schema;
+                }
+                String sql;
+                if (schema == null) {
+                    sql = "select column_name, data_type from information_schema.columns where table_schema=current_schema() and table_name='"
+                            + tbl + "';";
+                } else {
+                    sql = "select column_name, data_type from information_schema.columns where table_schema='"
+                            + localSchema + "' and table_name='" + tbl + "';";
+                }
+                ResultSet rs = executeSQLRead(sql);
+                if (rs != null) {
+                    ti = new TableInfo();
+                    while (rs.next()) {
+                        String name = rs.getString("column_name");
+                        String type = rs.getString("data_type");
+                        ti.columns.add(name);
+                        ti.coltype.add(mapDatatypeNameToType(type));
+                    }
+                    rs.getStatement().close();
+                } else {
+                    logger.error(EELFLoggerDelegate.errorLogger,
+                            "Cannot retrieve table info for table " + tableName + " from POSTGRES.");
+                    return null;
+                }
+                final String keysql =
+                        "SELECT a.attname as column_name FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid"
+                                + " AND a.attnum = ANY(i.indkey) WHERE  i.indrelid = '" + tbl + "'::regclass "
+                                + " AND i.indisprimary;";
+                ResultSet rs2 = executeSQLRead(keysql);
+                Set<String> keycols = new HashSet<>();
+                if (rs2 != null) {
+                    while (rs2.next()) {
+                        String name = rs2.getString("column_name");
+                        keycols.add(name);
+                    }
+                    rs2.getStatement().close();
+                } else {
+                    logger.error(EELFLoggerDelegate.errorLogger,
+                            "Cannot retrieve table info for table " + tableName + " from MySQL.");
+                }
+                for (String col : ti.columns) {
+                    if (keycols.contains(col)) {
+                        ti.iskey.add(true);
+                    } else {
+                        ti.iskey.add(false);
+                    }
+                }
+            } catch (SQLException e) {
+                logger.error(EELFLoggerDelegate.errorLogger,
+                        "Cannot retrieve table info for table " + tableName + " from MySQL: " + e);
+                return null;
+            }
+            tables.put(tableName, ti);
+        }
+        return ti;
+    }
+
+    // Map Postgres data type names to the java.sql.Types equivalent
+    private int mapDatatypeNameToType(String nm) {
+        switch (nm) {
+            case "character":
+                return Types.CHAR;
+            case "national character":
+                return Types.NCHAR;
+            case "character varying":
+                return Types.VARCHAR;
+            case "national character varying":
+                return Types.NVARCHAR;
+            case "text":
+                return Types.VARCHAR;
+            case "bytea":
+                return Types.BINARY;
+            case "smallint":
+                return Types.SMALLINT;
+            case "integer":
+                return Types.INTEGER;
+            case "bigint":
+                return Types.BIGINT;
+            case "smallserial":
+                return Types.SMALLINT;
+            case "serial":
+                return Types.INTEGER;
+            case "bigserial":
+                return Types.BIGINT;
+            case "real":
+                return Types.REAL;
+            case "double precision":
+                return Types.DOUBLE;
+            case "numeric":
+                return Types.NUMERIC;
+            case "decimal":
+                return Types.DECIMAL;
+            case "date":
+                return Types.DATE;
+            case "time with time zone":
+                return Types.TIME_WITH_TIMEZONE;
+            case "time without time zone":
+                return Types.TIME;
+            case "timestamp without time zone":
+                return Types.TIMESTAMP;
+            case "timestamp with time zone":
+                return Types.TIMESTAMP_WITH_TIMEZONE;
+            case "boolean":
+                return Types.BIT;
+            case "bit":
+                return Types.BIT;
+            case "oid":
+                return Types.BIGINT;
+            case "xml":
+                return Types.SQLXML;
+            case "array":
+                return Types.ARRAY;
+            case "tinyint":
+                return Types.TINYINT;
+            case "uuid":
+            case "money":
+            case "interval":
+            case "bit varying":
+            case "box":
+            case "point":
+            case "lseg":
+            case "path":
+            case "polygon":
+            case "circle":
+            case "json":
+            case "inet":
+            case "cidr":
+            case "macaddr":
+            case "tsvector":
+            case "tsquery":
+                return Types.OTHER;
+            default:
+                logger.error(EELFLoggerDelegate.errorLogger, "unrecognized and/or unsupported data type " + nm);
+                return Types.VARCHAR;
+        }
+    }
+
+    @Override
+    public void createSQLTriggers(String tableName) {
+        // Don't create triggers for the table the triggers write into!!!
+        if (tableName.equals(TRANS_TBL) || tableName.equals(TRANS_TBL_SCHEMA + "." + TRANS_TBL))
+            return;
+        try {
+            // No SELECT trigger
+            executeSQLWrite(generateTrigger(tableName, SQLOperation.INSERT));
+            executeSQLWrite(generateTrigger(tableName, SQLOperation.DELETE));
+            executeSQLWrite(generateTrigger(tableName, SQLOperation.UPDATE));
+        } catch (SQLException e) {
+            if (e.getMessage().trim().endsWith("already exists")) {
+                // only warn if trigger already exists
+                logger.warn(EELFLoggerDelegate.applicationLogger, "createSQLTriggers" + e);
+            } else {
+                logger.error(EELFLoggerDelegate.errorLogger, "createSQLTriggers: " + e);
+            }
+        }
+    }
+
+    private String generateTrigger(String tableName, SQLOperation op) {
+        StringBuilder triggerSql = new StringBuilder("CREATE TRIGGER ").append(getTriggerName(tableName, op))
+                .append(" AFTER " + op + " ON ").append(tableName).append(" FOR EACH ROW EXECUTE PROCEDURE ")
+                .append(functionName(op)).append(';');
+        return triggerSql.toString();
+    }
+
+    private String getTriggerName(String tableName, SQLOperation op) {
+        switch (op) {
+            case DELETE:
+                return "D_" + tableName;
+            case UPDATE:
+                return "U_" + tableName;
+            case INSERT:
+                return "I_" + tableName;
+            default:
+                throw new IllegalArgumentException("Invalid option in trigger operation type");
+        }
+    }
+
+    private String[] getTriggerNames(String tableName) {
+        return new String[] {getTriggerName(tableName, SQLOperation.INSERT),
+                getTriggerName(tableName, SQLOperation.DELETE), getTriggerName(tableName, SQLOperation.UPDATE),};
+    }
+
+    @Override
+    public void dropSQLTriggers(String tableName) {
+        try {
+            for (String name : getTriggerNames(tableName)) {
+                logger.debug(EELFLoggerDelegate.applicationLogger, "REMOVE trigger " + name + " from postgres");
+                executeSQLWrite("DROP TRIGGER IF EXISTS " + name + ";");
+            }
+        } catch (SQLException e) {
+            logger.error(EELFLoggerDelegate.errorLogger, "dropSQLTriggers: " + e);
+        }
+    }
+
+    @Override
+    public void insertRowIntoSqlDb(String tableName, Map<String, Object> map) {
+        throw new org.apache.commons.lang.NotImplementedException("Function not implemented yet in postgres");
+    }
+
+    @Override
+    public void deleteRowFromSqlDb(String tableName, Map<String, Object> map) {
+        throw new org.apache.commons.lang.NotImplementedException("Function not implemented yet in postgres");
+    }
+
+    /**
+     * This method executes a read query in the SQL database. Methods that call this method should be sure to call
+     * resultset.getStatement().close() when done in order to free up resources.
+     * 
+     * @param sql the query to run
+     * @return a ResultSet containing the rows returned from the query
+     */
+    @Override
+    public ResultSet executeSQLRead(String sql) {
+        logger.debug(EELFLoggerDelegate.applicationLogger, "Executing sql read in postgres");
+        logger.debug("Executing SQL read:" + sql);
+        ResultSet rs;
+        try {
+            Statement stmt = jdbcConn.createStatement();
+            rs = stmt.executeQuery(sql);
+        } catch (SQLException e) {
+            logger.error(EELFLoggerDelegate.errorLogger, "executeSQLRead" + e);
+            return null;
+        }
+        return rs;
+    }
+
+    /**
+     * This method executes a write query in the sql database.
+     * 
+     * @param sql the SQL to be sent to MySQL
+     * @throws SQLException if an underlying JDBC method throws an exception
+     */
+    protected void executeSQLWrite(String sql) throws SQLException {
+        logger.debug(EELFLoggerDelegate.applicationLogger, "Executing SQL write:" + sql);
+        Statement stmt = jdbcConn.createStatement();
+        stmt.execute(sql);
+        stmt.close();
+    }
+
+    @Override
+    public void preCommitHook() {
+        synchronized (stagingHandlerLock) {
+            // \TODO check if this can potentially block forever in certain scenarios
+            if (stagingHandler != null) {
+                stagingHandler.waitForAllPendingUpdates();
+            }
+        }
+    }
+
+    /**
+     * Code to be run within the DB driver before a SQL statement is executed. This is where tables can be synchronized
+     * before a SELECT, for those databases that do not support SELECT triggers.
+     * 
+     * @param sql the SQL statement that is about to be executed
+     * @return list of keys that will be updated, if they can't be determined afterwards (i.e. sql table doesn't have
+     *         primary key)
+     */
+    @Override
+    public void preStatementHook(final String sql) {
+        if (sql == null) {
+            return;
+        }
+        // \TODO: check if anything needs to be executed here for postgres
+    }
+
+    /**
+     * Code to be run within the DB driver after a SQL statement has been executed. This is where remote statement
+     * actions can be copied back to Cassandra/MUSIC.
+     * 
+     * @param sql the SQL statement that was executed
+     */
+    @Override
+    public void postStatementHook(final String sql, StagingTable transactionDigest) {
+        if (sql != null) {
+            String[] parts = sql.trim().split(" ");
+            String cmd = parts[0].toLowerCase();
+            if ("delete".equals(cmd) || "insert".equals(cmd) || "update".equals(cmd)) {
+                if (useAsyncStagingUpdate) {
+                    synchronized (stagingHandlerLock) {
+                        if (stagingHandler == null || currentStaging != transactionDigest) {
+                            Runnable newRunnable =
+                                    new PostgresMixin.StagingTableUpdateRunnable(this, transactionDigest);
+                            currentStaging = transactionDigest;
+                            stagingHandler = new AsyncUpdateHandler(newRunnable);
+                        }
+                        // else we can keep using the current staging Handler
+                    }
+                    stagingHandler.processNewUpdate();
+                } else {
+
+                    try {
+                        this.updateStagingTable(transactionDigest);
+                    } catch (NoSuchFieldException | MDBCServiceException e) {
+                        // TODO Auto-generated catch block
+                        this.logger.error("Error updating the staging table");
+                    }
+                }
+            }
+        }
+    }
+
+    private SQLOperation toOpEnum(String operation) throws NoSuchFieldException {
+        switch (operation.toLowerCase()) {
+            case "i":
+                return SQLOperation.INSERT;
+            case "d":
+                return SQLOperation.DELETE;
+            case "u":
+                return SQLOperation.UPDATE;
+            case "s":
+                return SQLOperation.SELECT;
+            default:
+                logger.error(EELFLoggerDelegate.errorLogger, "Invalid operation selected: [" + operation + "]");
+                throw new NoSuchFieldException("Invalid operation enum");
+        }
+
+    }
+
+    /**
+     * Copy data that is in transaction table into music interface
+     * 
+     * @param transactionDigests
+     * @throws NoSuchFieldException
+     */
+    private void updateStagingTable(StagingTable transactionDigests) throws NoSuchFieldException, MDBCServiceException {
+        String selectSql = "select ix, op, schema_name, table_name, original_data,new_data FROM " + TRANS_TBL_SCHEMA
+                + "." + TRANS_TBL + " where connection_id = '" + this.connId + "';";
+        Integer biggestIx = Integer.MIN_VALUE;
+        Integer smallestIx = Integer.MAX_VALUE;
+        try {
+            ResultSet rs = executeSQLRead(selectSql);
+            Set<Integer> rows = new TreeSet<Integer>();
+            while (rs.next()) {
+                int ix = rs.getInt("ix");
+                biggestIx = Integer.max(biggestIx, ix);
+                smallestIx = Integer.min(smallestIx, ix);
+                String op = rs.getString("op");
+                SQLOperation opType = toOpEnum(op);
+                String schema = rs.getString("schema_name");
+                String tbl = rs.getString("table_name");
+                String original = rs.getString("original_data");
+                String newData = rs.getString("new_data");
+                Range range = new Range(schema + "." + tbl);
+                transactionDigests.addOperation(range, opType, newData, original);
+                rows.add(ix);
+            }
+            rs.getStatement().close();
+            if (rows.size() > 0) {
+                logger.debug("Staging delete: Executing with vals [" + smallestIx + "," + biggestIx + "," + this.connId
+                        + "]");
+                this.deleteStagingStatement.setInt(1, smallestIx);
+                this.deleteStagingStatement.setInt(2, biggestIx);
+                this.deleteStagingStatement.setString(3, this.connId);
+                this.deleteStagingStatement.execute();
+            }
+        } catch (SQLException e) {
+            logger.warn("Exception in postStatementHook: " + e);
+            e.printStackTrace();
+        }
+    }
+
+
+
+    /**
+     * Update music with data from MySQL table
+     *
+     * @param tableName - name of table to update in music
+     */
+    @Override
+    public void synchronizeData(String tableName) {}
+
+    /**
+     * Return a list of "reserved" names, that should not be used by MySQL client/MUSIC These are reserved for mdbc
+     */
+    @Override
+    public List<String> getReservedTblNames() {
+        ArrayList<String> rsvdTables = new ArrayList<String>();
+        rsvdTables.add(TRANS_TBL_SCHEMA + "." + TRANS_TBL);
+        rsvdTables.add(TRANS_TBL);
+        // Add others here as necessary
+        return rsvdTables;
+    }
+
+    @Override
+    public String getPrimaryKey(String sql, String tableName) {
+        return null;
+    }
+
+    /**
+     * Parse the transaction digest into individual events
+     * 
+     * @param transaction - base 64 encoded, serialized digest
+     */
+    public void replayTransaction(StagingTable transaction, List<Range> ranges)
+            throws SQLException, MDBCServiceException {
+        boolean autocommit = jdbcConn.getAutoCommit();
+        jdbcConn.setAutoCommit(false);
+        Statement jdbcStmt = jdbcConn.createStatement();
+        final ArrayList<Operation> opList = transaction.getOperationList();
+
+        for (Operation op : opList) {
+            if (Range.overlaps(ranges, op.getTable())) {
+                try {
+                    replayOperationIntoDB(jdbcStmt, op);
+                } catch (SQLException | MDBCServiceException e) {
+                    // rollback transaction
+                    logger.error("Unable to replay: " + op.getOperationType() + "->" + op.getVal() + "."
+                            + "Rolling back the entire digest replay.");
+                    jdbcConn.rollback();
+                    throw e;
+                }
+            }
+        }
+
+        clearReplayedOperations(jdbcStmt);
+        jdbcConn.commit();
+        jdbcStmt.close();
+        jdbcConn.setAutoCommit(autocommit);
+    }
+
+    @Override
+    public void disableForeignKeyChecks() throws SQLException {
+        Statement disable = jdbcConn.createStatement();
+        disable.execute("SET session_replication_role = 'replica';");
+        disable.closeOnCompletion();
+    }
+
+    @Override
+    public void enableForeignKeyChecks() throws SQLException {
+        Statement enable = jdbcConn.createStatement();
+        enable.execute("SET session_replication_role = 'origin';");
+        enable.closeOnCompletion();
+    }
+
+    @Override
+    public void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException {
+        replayTransaction(txDigest, ranges);
+    }
+
+    /**
+     * Replays operation into database, usually from txDigest
+     * 
+     * @param jdbcStmt: Connection used to perform the replay
+     * @param op: operation to be replayed
+     * @throws SQLException
+     * @throws MDBCServiceException
+     */
+    private void replayOperationIntoDB(Statement jdbcStmt, Operation op) throws SQLException, MDBCServiceException {
+        logger.debug("Replaying Operation: " + op.getOperationType() + "->" + op.getVal());
+        JSONObject newVal = op.getVal();
+        JSONObject oldVal = null;
+        try {
+            oldVal = op.getKey();
+        } catch (MDBCServiceException e) {
+            // Ignore exception, in postgres the structure of the operation is different
+        }
+
+
+        TableInfo ti = getTableInfo(op.getTable());
+        final List<String> keyColumns = ti.getKeyColumns();
+
+        // build and replay the queries
+        String sql = constructSQL(op, keyColumns, newVal, oldVal);
+        if (sql == null)
+            return;
+
+        try {
+            logger.debug("Replaying operation: " + sql);
+            int updated = jdbcStmt.executeUpdate(sql);
+
+            if (updated == 0) {
+                // This applies only for replaying transactions involving Eventually Consistent tables
+                logger.warn(
+                        "Error Replaying operation: " + sql + "; Replacing insert/replace/viceversa and replaying ");
+
+                buildAndExecuteSQLInverse(jdbcStmt, op, keyColumns, newVal, oldVal);
+            }
+        } catch (SQLException sqlE) {
+            // This applies for replaying transactions involving Eventually Consistent tables
+            // or transactions that replay on top of existing keys
+            logger.warn(
+                    "Error Replaying operation: " + sql + ";" + "Replacing insert/replace/viceversa and replaying ");
+
+            buildAndExecuteSQLInverse(jdbcStmt, op, keyColumns, newVal, oldVal);
+
+        }
+    }
+
+    protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op, List<String> keyColumns,
+            JSONObject newVals, JSONObject oldVals) throws SQLException, MDBCServiceException {
+        String sqlInverse = constructSQLInverse(op, keyColumns, newVals, oldVals);
+        if (sqlInverse == null)
+            return;
+        logger.debug("Replaying operation: " + sqlInverse);
+        jdbcStmt.executeUpdate(sqlInverse);
+    }
+
+    protected String constructSQLInverse(Operation op, List<String> keyColumns, JSONObject newVals, JSONObject oldVals)
+            throws MDBCServiceException {
+        String sqlInverse = null;
+        switch (op.getOperationType()) {
+            case INSERT:
+                sqlInverse = constructUpdate(op.getTable(), keyColumns, newVals, oldVals);
+                break;
+            case UPDATE:
+                sqlInverse = constructInsert(op.getTable(), newVals);
+                break;
+            default:
+                break;
+        }
+        return sqlInverse;
+    }
+
+    protected String constructSQL(Operation op, List<String> keyColumns, JSONObject newVals, JSONObject oldVals)
+            throws MDBCServiceException {
+        String sql = null;
+        switch (op.getOperationType()) {
+            case INSERT:
+                sql = constructInsert(op.getTable(), newVals);
+                break;
+            case UPDATE:
+                sql = constructUpdate(op.getTable(), keyColumns, newVals, oldVals);
+                break;
+            case DELETE:
+                sql = constructDelete(op.getTable(), keyColumns, oldVals);
+                break;
+            case SELECT:
+                // no update happened, do nothing
+                break;
+            default:
+                logger.error(op.getOperationType() + "not implemented for replay");
+        }
+        return sql;
+    }
+
+    private String constructDelete(String tableName, List<String> keyColumns, JSONObject oldVals)
+            throws MDBCServiceException {
+        if (oldVals == null) {
+            throw new MDBCServiceException("Trying to update row with an empty old val exception");
+        }
+        StringBuilder sql = new StringBuilder();
+        sql.append("DELETE FROM ");
+        sql.append(tableName + " WHERE ");
+        sql.append(getPrimaryKeyConditional(keyColumns, oldVals));
+        sql.append(";");
+        return sql.toString();
+    }
+
+    private String constructInsert(String tableName, JSONObject newVals) {
+        StringBuilder keys = new StringBuilder();
+        StringBuilder vals = new StringBuilder();
+        String sep = "";
+        for (String col : newVals.keySet()) {
+            keys.append(sep + col);
+            vals.append(sep + "'" + newVals.get(col) + "'");
+            sep = ", ";
+        }
+        StringBuilder sql = new StringBuilder();
+        sql.append("INSERT INTO ").append(tableName + " (").append(keys).append(") VALUES (").append(vals).append(");");
+        return sql.toString();
+    }
+
+    private String constructUpdate(String tableName, List<String> keyColumns, JSONObject newVals, JSONObject oldVals)
+            throws MDBCServiceException {
+        if (oldVals == null) {
+            throw new MDBCServiceException("Trying to update row with an empty old val exception");
+        }
+        StringBuilder sql = new StringBuilder();
+        sql.append("UPDATE ").append(tableName).append(" SET ");
+        String sep = "";
+        for (String key : newVals.keySet()) {
+            sql.append(sep).append(key).append("=\"").append(newVals.get(key)).append("\"");
+            sep = ", ";
+        }
+        sql.append(" WHERE ");
+        sql.append(getPrimaryKeyConditional(keyColumns, oldVals));
+        sql.append(";");
+        return sql.toString();
+    }
+
+    /**
+     * Create an SQL string for AND'ing all of the primary keys
+     * 
+     * @param keyColumns list with the name of the columns that are key
+     * @param vals json with the contents of the old row
+     * @return string in the form of PK1=Val1 AND PK2=Val2 AND PK3=Val3
+     */
+    private String getPrimaryKeyConditional(List<String> keyColumns, JSONObject vals) {
+        StringBuilder keyCondStmt = new StringBuilder();
+        String and = "";
+        for (String key : keyColumns) {
+            // We cannot use the default primary key for the sql table and operations
+            if (!key.equals(mi.getMusicDefaultPrimaryKeyName())) {
+                Object val = vals.get(key);
+                keyCondStmt.append(and + key + "=\"" + val + "\"");
+                and = " AND ";
+            }
+        }
+        return keyCondStmt.toString();
+    }
+
+    /**
+     * Cleans out the transaction table, removing the replayed operations
+     * 
+     * @param jdbcStmt
+     * @throws SQLException
+     */
+    private void clearReplayedOperations(Statement jdbcStmt) throws SQLException {
+        logger.info("Clearing replayed operations");
+        String sql =
+                "DELETE FROM " + TRANS_TBL_SCHEMA + "." + TRANS_TBL + " WHERE CONNECTION_ID = '" + this.connId + "';";
+        jdbcStmt.executeUpdate(sql);
+    }
+
+    @Override
+    public Connection getSQLConnection() {
+        return jdbcConn;
+    }
+
+    @Override
+    public Set<String> getSQLTableSet() {
+        Set<String> set = new TreeSet<String>();
+        String sql =
+                "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=CURRENT_SCHEMA() AND TABLE_TYPE='BASE TABLE'";
+        try {
+            Statement stmt = jdbcConn.createStatement();
+            ResultSet rs = stmt.executeQuery(sql);
+            while (rs.next()) {
+                String s = rs.getString("TABLE_NAME");
+                set.add(s);
+            }
+            stmt.close();
+        } catch (SQLException e) {
+            logger.error(EELFLoggerDelegate.errorLogger, "getSQLTableSet: " + e);
+            System.out.println("getSQLTableSet: " + e);
+            e.printStackTrace();
+        }
+        logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set);
+        System.out.println("getSQLTableSet returning: " + set);
+        return set;
+    }
+
+}
index 86088f9..171ad79 100755 (executable)
@@ -190,28 +190,6 @@ public class Utils {
                }
                return list;
        }
-       
-       public static void registerDefaultDrivers() {
-               Properties pr = null;
-               try {
-                       pr = new Properties();
-                       pr.load(Utils.class.getResourceAsStream("/mdbc.properties"));
-               }
-               catch (IOException e) {
-                       logger.error("Could not load property file > " + e.getMessage());
-               }
-               
-               @SuppressWarnings("unused")
-               List<Class<?>> list = new ArrayList<Class<?>>();
-               String drivers = pr.getProperty("DEFAULT_DRIVERS");
-               for (String driver: drivers.split("[ ,]")) {
-                       logger.info(EELFLoggerDelegate.applicationLogger, "Registering jdbc driver '" + driver + "'");
-                       try {
-                               @SuppressWarnings("unused")
-                               Class<?> cl = Class.forName(driver.trim());
-                       } catch (ClassNotFoundException e) {
-                               logger.error(EELFLoggerDelegate.errorLogger,"Driver class "+driver+" not found.");
-                       }
-               }               
-       }
+
+
 }
index 4f3a3bf..5c6fae4 100644 (file)
@@ -89,9 +89,10 @@ public class MusicTxDigestDaemon implements Runnable {
                        logger.error("Music interface or DB interface is null in background daemon");
                        return;
                }
+               MdbcConnection conn = null;
                while (true) {
                        try {
-                               MdbcConnection conn = (MdbcConnection) stateManager.getConnection("daemon");
+                               conn = (MdbcConnection) stateManager.getConnection("daemon");
                                if (conn == null) {
                                        logger.error("Connection created is null in background daemon");
                                        return;
@@ -111,18 +112,20 @@ public class MusicTxDigestDaemon implements Runnable {
                                }
                                //2) for each partition I don't own
                                final Set<Range> warmupRanges = stateManager.getRangesToWarmup();
-                               final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
-                               List<Range> missingRanges = new ArrayList<>();
-                               if (currentPartitions.size() != 0) {
-                                       for (DatabasePartition part : currentPartitions) {
-                                               List<Range> partitionRanges = part.getSnapshot();
-                                               warmupRanges.removeAll(partitionRanges);
-                                       }
-                                       try {
-                                               stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
-                                       } catch (MDBCServiceException e) {
-                                               logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
-                                               continue;
+                               if (warmupRanges!=null) {
+                                       final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
+                                       List<Range> missingRanges = new ArrayList<>();
+                                       if (currentPartitions.size() != 0) {
+                                               for (DatabasePartition part : currentPartitions) {
+                                                       List<Range> partitionRanges = part.getSnapshot();
+                                                       warmupRanges.removeAll(partitionRanges);
+                                               }
+                                               try {
+                                                       stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
+                                               } catch (MDBCServiceException e) {
+                                                       logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
+                                                       continue;
+                                               }
                                        }
                                }
 
@@ -138,6 +141,12 @@ public class MusicTxDigestDaemon implements Runnable {
                        } catch (InterruptedException | SQLException e) {
                                logger.error("MusicTxDigest background daemon stopped " + e.getMessage(), e);
                                Thread.currentThread().interrupt();
+                       } finally {
+                               try {
+                                       if (conn!=null && !conn.isClosed()) conn.close();
+                               } catch (SQLException e) {
+                                       logger.error("MusicTxDigest background daemon error closing" + e.getMessage(), e);
+                               }
                        }
                }
        }
index 8fa49a9..db9e455 100755 (executable)
@@ -22,10 +22,18 @@ package org.onap.music.mdbc.tables;
 import java.util.UUID;
 
 public final class MusicTxDigestId {
+    public final UUID mriId;
        public final UUID transactionId;
        public final int index;
 
+       public MusicTxDigestId(UUID mriRowId, UUID digestId, int index) {
+           this.mriId=mriRowId;
+               this.transactionId= digestId;
+               this.index=index;
+       }
+
        public MusicTxDigestId(UUID digestId, int index) {
+               this.mriId = null;
                this.transactionId= digestId;
                this.index=index;
        }
index d406ad3..3258d0f 100755 (executable)
@@ -66,7 +66,7 @@ public final class Operation implements Serializable{
     }
 
        public JSONObject getKey() throws MDBCServiceException {
-           if(KEY==null){
+           if(KEY==null||KEY.isEmpty()){
             throw new MDBCServiceException("This operation ["+TYPE.toString()+"] doesn't contain a key");
         }
         JSONObject keys = new JSONObject(new JSONTokener(KEY));
index 4fdd7ac..4ef9d30 100755 (executable)
@@ -186,7 +186,10 @@ public class StagingTable {
         }
                OpType newType = (type==SQLOperation.INSERT)?OpType.INSERT:(type==SQLOperation.DELETE)?
                        OpType.DELETE:OpType.UPDATE;
-           Row.Builder rowBuilder = Row.newBuilder().setTable(range.getTable()).setType(newType).setVal(newVal);
+           Row.Builder rowBuilder = Row.newBuilder().setTable(range.getTable()).setType(newType);
+           if(newVal!=null) {
+               rowBuilder.setVal(newVal);
+        }
            if(keys!=null){
                rowBuilder.setKey(keys);
         }
@@ -252,10 +255,10 @@ public class StagingTable {
     }
     
     synchronized public boolean isEventualEmpty() {
-        return (eventuallyBuilder.getRowsCount()==0);
+           return (eventuallyBuilder!=null) && (eventuallyBuilder.getRowsCount()==0);
     }
-       
-       synchronized public void clear() throws MDBCServiceException {
+
+    synchronized public void clear() throws MDBCServiceException {
         if(!builderInitialized){
             throw new MDBCServiceException("This type of staging table is unmutable, please use the constructor"
                 + "with no parameters");
index 7624201..f6239d1 100755 (executable)
@@ -24,6 +24,8 @@ import org.onap.music.mdbc.configurations.NodeConfiguration;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.UUID;
 
 public class CreatePartition {
@@ -51,7 +53,9 @@ public class CreatePartition {
     }
 
     public void convert(){
-        config = new NodeConfiguration(tables, eventual,UUID.fromString(mriIndex),"test","");
+        String[] tablesArray=eventual.split(",");
+        ArrayList<String> eventualTables = (new ArrayList<>(Arrays.asList(tablesArray)));
+        config = new NodeConfiguration(tables, eventualTables,UUID.fromString(mriIndex),"test","");
     }
 
     public void saveToFile(){
index 73e8f77..49fdfd2 100755 (executable)
@@ -4,10 +4,15 @@
 MIXINS= \
        org.onap.music.mdbc.mixins.MySQLMixin \
        org.onap.music.mdbc.mixins.MusicMixin \
-       org.onap.music.mdbc.mixins.Music2Mixin
+       org.onap.music.mdbc.mixins.Music2Mixin \
+       org.onap.music.mdbc.mixins.PostgresMixin
+
+DEFAULT_DB_MIXIN= mysql
+
+DEFAULT_MUSIC_MIXIN= cassandra2
 
 DEFAULT_DRIVERS=\
-       org.h2.Driver \
-       com.mysql.jdbc.Driver
+        org.mariadb.jdbc.Driver \
+        org.postgresql.Driver
 
-txdaemonsleeps=15
\ No newline at end of file
+txdaemonsleeps=15
index 4703d0e..87f8445 100755 (executable)
@@ -36,7 +36,7 @@ public class MDBCUtilsTest {
     public void toStringTest1() {
         StagingTable table = new StagingTable();
         try {
-            table.addOperation(new Range("TABLE1"),SQLOperation.INSERT,(new JSONObject(new String[]{"test3", "Test4"})).toString(),null);
+            table.addOperation(new Range("TEST.TABLE1"),SQLOperation.INSERT,(new JSONObject(new String[]{"test3", "Test4"})).toString(),null);
         } catch (MDBCServiceException e) {
             fail();
         }
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java b/mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java
new file mode 100644 (file)
index 0000000..72ec8d3
--- /dev/null
@@ -0,0 +1,280 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import ch.vorburger.exec.ManagedProcessException;
+import ch.vorburger.mariadb4j.DB;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.opentable.db.postgres.embedded.EmbeddedPostgres;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Properties;
+import javax.sql.DataSource;
+import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.onap.music.datastore.MusicDataStore;
+import org.onap.music.datastore.MusicDataStoreHandle;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.lockingservice.cassandra.CassaLockStore;
+import org.onap.music.mdbc.mixins.MusicMixin;
+import org.onap.music.mdbc.mixins.PostgresMixin;
+
+public class MdbcTestUtils {
+
+    // Postgres variables
+    static EmbeddedPostgres pg=null;
+    static DataSource postgresDatabase=null;
+    final private static int postgresPort = 13307;
+    @Rule
+    public static TemporaryFolder tf = new TemporaryFolder();
+
+    // Cassandra variables
+    //Properties used to connect to music
+    private static Cluster cluster;
+    private static Session session;
+
+    //Mdbc variables
+    final private static String keyspace="metricmusictest";
+    final private static String mdbcServerName = "name";
+    final private static String mtdTableName = "musictxdigest";
+    final private static String eventualMtxdTableName = "musicevetxdigest";
+    final private static String mriTableName = "musicrangeinformation";
+    final private static String rangeDependencyTableName = "musicrangedependency";
+    final private static String nodeInfoTableName = "nodeinfo";
+    //Mariadb variables
+    static DB db=null;
+    final public static String mariaDBDatabaseName="test";
+    final static Integer mariaDbPort=13306;
+
+
+
+    public enum DBType {POSTGRES, MySQL}
+
+    public static String getCassandraUrl(){
+        return cluster.getMetadata().getAllHosts().iterator().next().getAddress().toString();
+
+    }
+
+    public static String getKeyspace(){
+        return keyspace;
+    }
+
+    public static String getServerName(){
+        return mdbcServerName;
+    }
+
+    public static String getMriTableName(){
+        return mriTableName;
+    }
+
+    public static String getMariaDbPort() {
+        return mariaDbPort.toString();
+    }
+
+    public static String getMariaDBDBName(){
+        return mariaDBDatabaseName;
+    }
+
+    static Connection getPostgreConnection() {
+        startPostgres();
+        Connection conn=null;
+        try
+        {
+            conn = postgresDatabase.getConnection();
+        } catch(SQLException e){
+            e.printStackTrace();
+            fail();
+        }
+        return conn;
+    }
+
+    static synchronized public void startPostgres(){
+        if(pg==null) {
+            try {
+                tf.create();
+                pg = EmbeddedPostgres.builder().setPort(postgresPort).setDataDirectory(tf.newFolder("tmp")+"/data-dir").start();
+            } catch (IOException e) {
+                e.printStackTrace();
+                fail();
+            }
+        }
+        if(postgresDatabase==null) {
+            postgresDatabase = pg.getPostgresDatabase();
+        }
+    }
+
+    static public String getPostgresUrl(){
+        return getPostgresUrlWithoutDb()+"/postgres";
+    }
+
+    static public String getPostgresUrlWithoutDb(){
+        return "jdbc:postgresql://localhost:"+Integer.toString(postgresPort);
+    }
+
+    synchronized static Connection getMariadbConnection(){
+        startMariaDb();
+        Connection conn = null;
+        try {
+            conn = DriverManager
+                .getConnection(getMariadbUrlWithoutDatabase()+"/"+mariaDBDatabaseName, "root", "");
+        } catch (SQLException e) {
+            e.printStackTrace();
+            fail("Error creating mdbc connection");
+        }
+        return conn;
+    }
+
+    public synchronized static void startMariaDb(){
+        if (db == null) {
+            try {
+                db=DB.newEmbeddedDB(mariaDbPort);
+                db.start();
+                db.createDB(mariaDBDatabaseName);
+            } catch (ManagedProcessException e) {
+                e.printStackTrace();
+                fail("error initializing embedded mariadb");
+            }
+        }
+    }
+
+    static String getMariadbUrlWithoutDatabase(){
+        return  "jdbc:mariadb://localhost:"+Integer.toString(mariaDbPort);
+    }
+
+    public static Connection getConnection(DBType type){
+        switch(type){
+            case MySQL:
+                return getMariadbConnection();
+            case POSTGRES:
+                return getPostgreConnection();
+            default:
+                fail("Wrong type for creating connection");
+        }
+        return null;
+    }
+
+    synchronized static void stopPostgres(){
+        postgresDatabase=null;
+        if(pg!=null) {
+            try {
+                pg.close();
+                pg=null;
+            } catch (IOException e) {
+                e.printStackTrace();
+                fail("Error closing postgres database");
+            }
+        }
+        if(tf!=null){
+            tf.delete();
+        }
+    }
+
+    static void stopMySql(){
+        try {
+            db.stop();
+        } catch (ManagedProcessException e) {
+            e.printStackTrace();
+            fail("Error closing mysql");
+        }
+    }
+
+    public static void cleanDatabase(DBType type){
+        switch(type) {
+            case MySQL:
+                stopMySql();
+                break;
+            case POSTGRES:
+                stopPostgres();
+                break;
+            default:
+                fail("Wrong type for creating connection");
+        }
+    }
+
+    public static void initCassandra(){
+        try {
+            EmbeddedCassandraServerHelper.startEmbeddedCassandra(EmbeddedCassandraServerHelper.CASSANDRA_RNDPORT_YML_FILE);
+        } catch (Exception e) {
+            System.out.println(e);
+            fail("Error starting embedded cassandra");
+        }
+        cluster=EmbeddedCassandraServerHelper.getCluster();
+        //cluster = new Cluster.Builder().addContactPoint(cassaHost).withPort(9142).build();
+        cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(20000);
+        assertNotNull("Invalid configuration for cassandra", cluster);
+        session = EmbeddedCassandraServerHelper.getSession();
+        assertNotNull("Invalid configuration for cassandra", session);
+
+        MusicDataStoreHandle.mDstoreHandle = new MusicDataStore(cluster, session);
+        CassaLockStore store = new CassaLockStore(MusicDataStoreHandle.mDstoreHandle);
+        assertNotNull("Invalid configuration for music", store);
+    }
+
+    public static void stopCassandra(){
+        try {
+            EmbeddedCassandraServerHelper.cleanEmbeddedCassandra();
+        }
+        catch(NullPointerException e){
+        }
+    }
+
+    public static Session getSession(){
+       return session;
+    }
+
+    public static MusicMixin getMusicMixin() throws MDBCServiceException {
+        initNamespaces();
+        initTables();
+        MusicMixin mixin=null;
+        try {
+            Properties properties = new Properties();
+            properties.setProperty(MusicMixin.KEY_MY_ID,MdbcTestUtils.getServerName());
+            properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,MdbcTestUtils.getKeyspace());
+            properties.setProperty(MusicMixin.KEY_MUSIC_RFACTOR,"1");
+            properties.setProperty(MusicMixin.KEY_MUSIC_ADDRESS,MdbcTestUtils.getCassandraUrl());
+            mixin =new MusicMixin(null, MdbcTestUtils.getServerName(),properties);
+        } catch (MDBCServiceException e) {
+            fail("error creating music mixin");
+        }
+        return mixin;
+    }
+
+    public static void initNamespaces() throws MDBCServiceException{
+        MusicMixin.createKeyspace("music_internal",1);
+        MusicMixin.createKeyspace(keyspace,1);
+    }
+
+    public static void initTables() throws MDBCServiceException{
+        MusicMixin.createMusicRangeInformationTable(keyspace, mriTableName);
+        MusicMixin.createMusicTxDigest(mtdTableName,keyspace, -1);
+        MusicMixin.createMusicEventualTxDigest(eventualMtxdTableName,keyspace, -1);
+        MusicMixin.createMusicNodeInfoTable(nodeInfoTableName,keyspace,-1);
+        MusicMixin.createMusicRangeDependencyTable(keyspace,rangeDependencyTableName);
+    }
+
+}
index 2d31939..862e600 100644 (file)
 
 package org.onap.music.mdbc;
 
+import java.util.Properties;
 import org.junit.*;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 
+import org.onap.music.mdbc.MdbcTestUtils.DBType;
 import org.onap.music.mdbc.mixins.MySQLMixin;
 
 import ch.vorburger.mariadb4j.DB;
@@ -33,8 +35,8 @@ import ch.vorburger.mariadb4j.DB;
 public class MySQLMixinTest {
 
        public static final String DATABASE = "mdbctest";
-       public static final String TABLE= "Persons";
-       public static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS " + TABLE + " (\n" +
+       public static final String TABLE= MdbcTestUtils.getMariaDBDBName();
+       public static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS " + MdbcTestUtils.getMariaDBDBName()+ " (\n" +
             "    PersonID int,\n" +
             "    LastName varchar(255),\n" +
             "    FirstName varchar(255),\n" +
@@ -52,10 +54,8 @@ public class MySQLMixinTest {
        @BeforeClass
        public static void init() throws Exception {
                Class.forName("org.mariadb.jdbc.Driver");
-               //start embedded mariadb
-               DB db = DB.newEmbeddedDB(13306);
-               db.start();
-               db.createDB(DATABASE);
+               MdbcTestUtils.startMariaDb();
+
        }
        
        @AfterClass
@@ -65,13 +65,14 @@ public class MySQLMixinTest {
        
        @Before
        public void beforeTest() throws SQLException {
-               this.conn = DriverManager.getConnection("jdbc:mariadb://localhost:13306/"+DATABASE, "root", "");
-               this.mysqlMixin = new MySQLMixin(null, "localhost:13306/"+DATABASE, conn, null);
+               this.conn = MdbcTestUtils.getConnection(DBType.MySQL);
+               Properties info = new Properties();
+               this.mysqlMixin = new MySQLMixin(null, null, conn, info);
        }
        
        @Test
        public void testGetDataBaseName() throws SQLException {
-               Assert.assertEquals(DATABASE, mysqlMixin.getDatabaseName());
+               Assert.assertEquals(MdbcTestUtils.getMariaDBDBName(), mysqlMixin.getDatabaseName());
        }
 
 }
index 41a943e..c8d284e 100644 (file)
@@ -31,23 +31,35 @@ import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.function.Consumer;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.junit.Assert.*;
+
+import com.datastax.driver.core.Session;
+
+import java.util.*;
+
+
 import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
+
 import org.onap.music.datastore.MusicDataStore;
 import org.onap.music.datastore.MusicDataStoreHandle;
+
 import org.onap.music.exceptions.MDBCServiceException;
 import org.onap.music.exceptions.MusicLockingException;
 import org.onap.music.exceptions.MusicQueryException;
 import org.onap.music.exceptions.MusicServiceException;
-import org.onap.music.lockingservice.cassandra.CassaLockStore;
 import org.onap.music.lockingservice.cassandra.MusicLockState;
 import org.onap.music.main.MusicCore;
 import org.onap.music.mdbc.DatabasePartition;
 import org.onap.music.mdbc.MDBCUtils;
 import org.onap.music.mdbc.Range;
+
 import org.onap.music.mdbc.StateManager;
 import org.onap.music.mdbc.proto.ProtoDigest.Digest.CompleteDigest;
 import org.onap.music.mdbc.tables.MusicRangeInformationRow;
@@ -57,37 +69,25 @@ import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Session;
 import com.google.protobuf.InvalidProtocolBufferException;
 
+import org.onap.music.mdbc.MdbcTestUtils;
+import org.onap.music.mdbc.TestUtils;
+import org.onap.music.mdbc.ownership.Dag;
+import org.onap.music.mdbc.ownership.DagNode;
+import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+
 public class MusicMixinTest {
        
-    final private static String keyspace="metricmusictest";
-    final private static String mriTableName = "musicrangeinformation";
-    final private static String mtdTableName = "musictxdigest";
-    final private static String mdbcServerName = "name";
+
 
     //Properties used to connect to music
-    private static Cluster cluster;
     private static Session session;
-    private static String cassaHost = "localhost";
     private static MusicMixin mixin = null;
     private StateManager stateManager;
 
     @BeforeClass
-    public static void init() throws MusicServiceException {
-        try {
-            EmbeddedCassandraServerHelper.startEmbeddedCassandra();
-        } catch (Exception e) {
-            System.out.println(e);
-        }
-        cluster=EmbeddedCassandraServerHelper.getCluster();
-        //cluster = new Cluster.Builder().addContactPoint(cassaHost).withPort(9142).build();
-        cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(20000);
-        assertNotNull("Invalid configuration for cassandra", cluster);
-        session = EmbeddedCassandraServerHelper.getSession();
-        assertNotNull("Invalid configuration for cassandra", session);
+    public static void init() throws MDBCServiceException {
+        MdbcTestUtils.initCassandra();
 
-        MusicDataStoreHandle.mDstoreHandle = new MusicDataStore(cluster, session);
-        CassaLockStore store = new CassaLockStore(MusicDataStoreHandle.mDstoreHandle);
-        assertNotNull("Invalid configuration for music", store);
     }
 
     @AfterClass
@@ -102,59 +102,52 @@ public class MusicMixinTest {
     }
 
     @Before
-    public void initTest(){
-        session.execute("DROP KEYSPACE IF EXISTS "+keyspace);
+    public void initTest() throws MDBCServiceException {
+        session = MdbcTestUtils.getSession();
+        session.execute("DROP KEYSPACE IF EXISTS "+ MdbcTestUtils.getKeyspace());
+        mixin=MdbcTestUtils.getMusicMixin();
+    }
+
+    //@Test(timeout=10000)
+    @Ignore  // TODO: Move ownership tests to OwnershipAndCheckpointTest 
+    @Test
+    public void own() {
+        Range range = new Range("TEST.TABLE1");
+        List<Range> ranges = new ArrayList<>();
+        ranges.add(range);
+        DatabasePartition partition=null;
         try {
-            Properties properties = new Properties();
-            properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace);
-            properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName);
-            properties.setProperty(MusicMixin.KEY_COMPRESSION, Boolean.toString(true));
-            mixin=new MusicMixin(stateManager, mdbcServerName,properties);
-        } catch (MDBCServiceException e) {
-            fail("error creating music mixin");
+            partition = TestUtils.createBasicRow(range, mixin, MdbcTestUtils.getServerName());
+        }
+        catch(Exception e){
+            fail("fail to create partition");
+        }
+        try {
+            TestUtils.unlockRow(MdbcTestUtils.getKeyspace(),MdbcTestUtils.getMriTableName(),partition);
+        } catch (MusicLockingException e) {
+            fail(e.getMessage());
         }
 
+        DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
+        try {
+            mixin.getStateManager().getOwnAndCheck().own(mixin,ranges,currentPartition, MDBCUtils.generateTimebasedUniqueKey());
+        } catch (MDBCServiceException e) {
+            fail("failure when running own function");
+        }
     }
 
-    //Own has been removed from musicMixin
-//    @Test(timeout=10000)
-//    public void own() {
-//        Range range = new Range("TABLE1");
-//        List<Range> ranges = new ArrayList<>();
-//        ranges.add(range);
-//    DatabasePartition partition=null;
-//    try {
-//      partition = TestUtils.createBasicRow(range, mixin, mdbcServerName);
-//    }
-//    catch(Exception e){
-//      fail(e.getMessage());
-//    }
-//    try {
-//      TestUtils.unlockRow(keyspace,mriTableName,partition);
-//    } catch (MusicLockingException e) {
-//      fail(e.getMessage());
-//    }
-//
-//        DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
-//        try {
-//            mixin.own(ranges,currentPartition, MDBCUtils.generateTimebasedUniqueKey());
-//        } catch (MDBCServiceException e) {
-//            fail("failure when running own function");
-//        }
-//    }
-
     private DatabasePartition addRow(List<Range> ranges,boolean isLatest){
         final UUID uuid = MDBCUtils.generateTimebasedUniqueKey();
         DatabasePartition dbPartition = new DatabasePartition(ranges,uuid,null);
         MusicRangeInformationRow newRow = new MusicRangeInformationRow(uuid,dbPartition, new ArrayList<>(), "",
-            mdbcServerName, isLatest);
+            MdbcTestUtils.getServerName(), isLatest);
         DatabasePartition partition=null;
         try {
             partition = mixin.createMusicRangeInformation(newRow);
         } catch (MDBCServiceException e) {
             fail("failure when creating new row");
         }
-        String fullyQualifiedMriKey = keyspace+"."+ mriTableName+"."+partition.getMRIIndex().toString();
+        String fullyQualifiedMriKey = MdbcTestUtils.getKeyspace()+"."+ MdbcTestUtils.getMriTableName()+"."+partition.getMRIIndex().toString();
         try {
             MusicLockState musicLockState = MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey, partition.getLockId());
         } catch (MusicLockingException e) {
@@ -163,81 +156,81 @@ public class MusicMixinTest {
         return partition;
     }
 
-    //Own has been removed from musicMixin
-//    @Test(timeout=10000)
-//    public void own2() throws InterruptedException, MDBCServiceException {
-//        List<Range> range12 = new ArrayList<>( Arrays.asList(
-//            new Range("RANGE1"),
-//            new Range("RANGE2")
-//        ));
-//        List<Range> range34 = new ArrayList<>( Arrays.asList(
-//            new Range("RANGE3"),
-//            new Range("RANGE4")
-//        ));
-//        List<Range> range24 = new ArrayList<>( Arrays.asList(
-//            new Range("RANGE2"),
-//            new Range("RANGE4")
-//        ));
-//        List<Range> range123 = new ArrayList<>( Arrays.asList(
-//            new Range("RANGE1"),
-//            new Range("RANGE2"),
-//            new Range("RANGE3")
-//        ));
-//        DatabasePartition db1 = addRow(range12, false);
-//        DatabasePartition db2 = addRow(range34, false);
-//        MILLISECONDS.sleep(10);
-//        DatabasePartition db3 = addRow(range12, true);
-//        DatabasePartition db4 = addRow(range34, true);
-//        MILLISECONDS.sleep(10);
-//        DatabasePartition db5 = addRow(range24, true);
-//        DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
-//        MusicInterface.OwnershipReturn own = null;
-//        try {
-//            own = mixin.own(range123, currentPartition, MDBCUtils.generateTimebasedUniqueKey());
-//        } catch (MDBCServiceException e) {
-//            fail("failure when running own function");
-//        }
-//        Dag dag = own.getDag();
-//
-//        DagNode node4 = dag.getNode(db4.getMRIIndex());
-//        assertFalse(node4.hasNotIncomingEdges());
-//        List<DagNode> outgoingEdges = new ArrayList<>(node4.getOutgoingEdges());
-//        assertEquals(1,outgoingEdges.size());
-//
-//        DagNode missing = outgoingEdges.get(0);
-//        Set<Range> missingRanges = missing.getRangeSet();
-//        assertEquals(2,missingRanges.size());
-//        assertTrue(missingRanges.contains(new Range("RANGE1")));
-//        assertTrue(missingRanges.contains(new Range("RANGE3")));
-//        List<DagNode> outgoingEdges1 = missing.getOutgoingEdges();
-//        assertEquals(1,outgoingEdges1.size());
-//
-//        DagNode finalNode = outgoingEdges1.get(0);
-//        assertFalse(finalNode.hasNotIncomingEdges());
-//        Set<Range> finalSet = finalNode.getRangeSet();
-//        assertEquals(3,finalSet.size());
-//        assertTrue(finalSet.contains(new Range("RANGE1")));
-//        assertTrue(finalSet.contains(new Range("RANGE2")));
-//        assertTrue(finalSet.contains(new Range("RANGE3")));
-//
-//        DagNode node5 = dag.getNode(db5.getMRIIndex());
-//        List<DagNode> toRemoveOutEdges = node5.getOutgoingEdges();
-//        assertEquals(1,toRemoveOutEdges.size());
-//        toRemoveOutEdges.remove(finalNode);
-//        assertEquals(0,toRemoveOutEdges.size());
-//
-//        MusicRangeInformationRow row = mixin.getMusicRangeInformation(own.getRangeId());
-//        assertTrue(row.getIsLatest());
-//        DatabasePartition dbPartition = row.getDBPartition();
-//        List<Range> snapshot = dbPartition.getSnapshot();
-//        assertEquals(3,snapshot.size());
-//        MusicRangeInformationRow node5row = mixin.getMusicRangeInformation(node5.getId());
-//        assertFalse(node5row.getIsLatest());
-//        MusicRangeInformationRow node4Row = mixin.getMusicRangeInformation(db4.getMRIIndex());
-//        assertFalse(node4Row.getIsLatest());
-//        MusicRangeInformationRow node3Row = mixin.getMusicRangeInformation(db3.getMRIIndex());
-//        assertFalse(node3Row.getIsLatest());
-//    }
+    @Ignore  // TODO: Move ownership tests to OwnershipAndCheckpointTest 
+    @Test(timeout=1000)
+    public void own2() throws InterruptedException, MDBCServiceException {
+        List<Range> range12 = new ArrayList<>( Arrays.asList(
+            new Range("TEST.RANGE1"),
+            new Range("TEST.RANGE2")
+        ));
+        List<Range> range34 = new ArrayList<>( Arrays.asList(
+            new Range("TEST.RANGE3"),
+            new Range("TEST.RANGE4")
+        ));
+        List<Range> range24 = new ArrayList<>( Arrays.asList(
+            new Range("TEST.RANGE2"),
+            new Range("TEST.RANGE4")
+        ));
+        List<Range> range123 = new ArrayList<>( Arrays.asList(
+            new Range("TEST.RANGE1"),
+            new Range("TEST.RANGE2"),
+            new Range("TEST.RANGE3")
+        ));
+        DatabasePartition db1 = addRow(range12, false);
+        DatabasePartition db2 = addRow(range34, false);
+        MILLISECONDS.sleep(10);
+        DatabasePartition db3 = addRow(range12, true);
+        DatabasePartition db4 = addRow(range34, true);
+        MILLISECONDS.sleep(10);
+        DatabasePartition db5 = addRow(range24, true);
+        DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
+        MusicInterface.OwnershipReturn own = null;
+        try {
+            own = mixin.getStateManager().getOwnAndCheck().own(mixin,range123, currentPartition, MDBCUtils.generateTimebasedUniqueKey());
+        } catch (MDBCServiceException e) {
+            fail("failure when running own function");
+        }
+        Dag dag = own.getDag();
+
+        DagNode node4 = dag.getNode(db4.getMRIIndex());
+        assertFalse(node4.hasNotIncomingEdges());
+        List<DagNode> outgoingEdges = new ArrayList<>(node4.getOutgoingEdges());
+        assertEquals(1,outgoingEdges.size());
+
+        DagNode missing = outgoingEdges.get(0);
+        Set<Range> missingRanges = missing.getRangeSet();
+        assertEquals(2,missingRanges.size());
+        assertTrue(missingRanges.contains(new Range("TEST.RANGE1")));
+        assertTrue(missingRanges.contains(new Range("TEST.RANGE3")));
+        List<DagNode> outgoingEdges1 = missing.getOutgoingEdges();
+        assertEquals(1,outgoingEdges1.size());
+
+        DagNode finalNode = outgoingEdges1.get(0);
+        assertFalse(finalNode.hasNotIncomingEdges());
+        Set<Range> finalSet = finalNode.getRangeSet();
+        assertEquals(3,finalSet.size());
+        assertTrue(finalSet.contains(new Range("TEST.RANGE1")));
+        assertTrue(finalSet.contains(new Range("TEST.RANGE2")));
+        assertTrue(finalSet.contains(new Range("TEST.RANGE3")));
+
+        DagNode node5 = dag.getNode(db5.getMRIIndex());
+        List<DagNode> toRemoveOutEdges = node5.getOutgoingEdges();
+        assertEquals(1,toRemoveOutEdges.size());
+        toRemoveOutEdges.remove(finalNode);
+        assertEquals(0,toRemoveOutEdges.size());
+
+        MusicRangeInformationRow row = mixin.getMusicRangeInformation(own.getRangeId());
+        assertTrue(row.getIsLatest());
+        DatabasePartition dbPartition = row.getDBPartition();
+        List<Range> snapshot = dbPartition.getSnapshot();
+        assertEquals(3,snapshot.size());
+        MusicRangeInformationRow node5row = mixin.getMusicRangeInformation(node5.getId());
+        assertFalse(node5row.getIsLatest());
+        MusicRangeInformationRow node4Row = mixin.getMusicRangeInformation(db4.getMRIIndex());
+        assertFalse(node4Row.getIsLatest());
+        MusicRangeInformationRow node3Row = mixin.getMusicRangeInformation(db3.getMRIIndex());
+        assertFalse(node3Row.getIsLatest());
+    }
 
     @Test
     public void relinquish() {
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java
new file mode 100644 (file)
index 0000000..2134a79
--- /dev/null
@@ -0,0 +1,220 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc.mixins;
+
+import static org.junit.Assert.*;
+
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.mdbc.MdbcTestUtils;
+import org.onap.music.mdbc.MdbcTestUtils.DBType;
+import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.TableInfo;
+import org.onap.music.mdbc.tables.StagingTable;
+
+public class PostgresMixinTest {
+    final private static String keyspace="metricmusictest";
+    final private static String mdbcServerName = "name";
+
+    static PostgresMixin mixin;
+
+    private static MusicMixin mi = null;
+    private static Connection conn;
+
+    @BeforeClass
+    public static void init() throws MDBCServiceException {
+        MdbcTestUtils.initCassandra();
+        mi=MdbcTestUtils.getMusicMixin();
+        try {
+            conn = MdbcTestUtils.getConnection(DBType.POSTGRES);
+            Properties info = new Properties();
+            mixin = new PostgresMixin(mi, null, conn, info);
+        } catch (SQLException e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+    @AfterClass
+    public static void close(){
+        //TODO: shutdown cassandra
+        mixin=null;
+        MdbcTestUtils.cleanDatabase(DBType.POSTGRES);
+        MdbcTestUtils.stopCassandra();
+     }
+
+    @Test
+    public void getMixinName() {
+        final String mixinName = mixin.getMixinName();
+        assertEquals(mixinName.toLowerCase(),"postgres");
+    }
+
+    @Test
+    public void getSQLTableSet() {
+        createTestTable();
+        final Set<String> sqlTableSet = mixin.getSQLTableSet();
+        assertEquals(1,sqlTableSet.size());
+        assertTrue(sqlTableSet.contains("testtable"));
+    }
+
+    @Test
+    public void getTableInfo() {
+        createTestTable();
+        final TableInfo tableInfo = mixin.getTableInfo("testtable");
+        assertNotNull(tableInfo);
+        assertEquals(3,tableInfo.columns.size());
+        int index=0;
+        for(String col: tableInfo.columns) {
+            switch(col.toLowerCase()){
+                case "ix":
+                    assertTrue(tableInfo.iskey.get(index));
+                    assertEquals(Types.INTEGER, tableInfo.coltype.get(index).intValue());
+                    break;
+                case "test1":
+                    assertFalse(tableInfo.iskey.get(index));
+                    assertEquals(Types.CHAR, tableInfo.coltype.get(index).intValue());
+                    break;
+                case "test2":
+                    assertFalse(tableInfo.iskey.get(index));
+                    assertEquals(Types.VARCHAR, tableInfo.coltype.get(index).intValue());
+                    break;
+                default:
+                    fail();
+            }
+            index++;
+        }
+    }
+
+    private void createTestTable() {
+        try {
+            final Statement statement = conn.createStatement();
+            statement.execute("CREATE TABLE IF NOT EXISTS testtable (IX SERIAL, test1 CHAR(1), test2 VARCHAR(255), PRIMARY KEY (IX));");
+            statement.close();
+        } catch (SQLException e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+    private void cleanTestTable() {
+        try {
+            final Statement statement = conn.createStatement();
+            statement.execute("DELETE FROM testtable;");
+            statement.close();
+        } catch (SQLException e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+    @Test
+    public void postStatementHook() {
+        createTestTable();
+        mixin.createSQLTriggers("testtable");
+        final String sqlOperation = "INSERT INTO testtable (test1,test2) VALUES ('u','test');";
+        Statement stm=null;
+        try {
+            stm = conn.createStatement();
+        } catch (SQLException e) {
+            e.printStackTrace();
+            fail();
+        }
+        mixin.preStatementHook(sqlOperation);
+        try {
+            stm.execute(sqlOperation);
+        } catch (SQLException e) {
+            e.printStackTrace();
+            fail();
+        }
+        StagingTable st=new StagingTable();
+        mixin.postStatementHook(sqlOperation,st);
+        mixin.preCommitHook();
+        assertFalse(st.isEmpty());
+    }
+
+    void checkEmptyTestTable(){
+        ResultSet resultSet = mixin.executeSQLRead("SELECT * FROM testtable;");
+        try {
+            assertFalse(resultSet.next());
+        } catch (SQLException e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+
+    void checkOneRowWithContents(String test1Val, String test2Val){
+        ResultSet resultSet = mixin.executeSQLRead("SELECT * FROM testtable;");
+        try {
+            assertTrue(resultSet.next());
+            assertEquals(test1Val, resultSet.getString("test1"));
+            assertEquals(test2Val, resultSet.getString("test2"));
+            assertFalse(resultSet.next());
+        }
+        catch(SQLException e){
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+    @Test
+    public void applyTxDigest() {
+        createTestTable();
+        mixin.createSQLTriggers("testtable");
+        final String sqlOperation = "INSERT INTO testtable (test1,test2) VALUES ('u','test');";
+        Statement stm=null;
+        try {
+            stm = conn.createStatement();
+        } catch (SQLException e) {
+            e.printStackTrace();
+            fail();
+        }
+        mixin.preStatementHook(sqlOperation);
+        try {
+            stm.execute(sqlOperation);
+        } catch (SQLException e) {
+            e.printStackTrace();
+            fail();
+        }
+        StagingTable st=new StagingTable();
+        mixin.postStatementHook(sqlOperation,st);
+        mixin.preCommitHook();
+        assertFalse(st.isEmpty());
+        cleanTestTable();
+        checkEmptyTestTable();
+        List<Range> ranges = new ArrayList<>();
+        ranges.add(new Range("public.testtable"));
+        try {
+            mixin.applyTxDigest(st,ranges);
+        } catch (SQLException|MDBCServiceException e) {
+            e.printStackTrace();
+            fail();
+        }
+        checkOneRowWithContents("u","test");
+    }
+}
\ No newline at end of file
index da64595..9e6161a 100644 (file)
@@ -59,7 +59,7 @@ public class DagTest {
     public void getDag() throws InterruptedException, MDBCServiceException {
         List<MusicRangeInformationRow> rows = new ArrayList<>();
         List<Range> ranges = new ArrayList<>( Arrays.asList(
-           new Range("range1")
+           new Range("schema.range1")
         ));
         rows.add(createNewRow(new ArrayList<>(ranges),"",false));
         MILLISECONDS.sleep(10);
@@ -87,14 +87,14 @@ public class DagTest {
     public void getDag2() throws InterruptedException, MDBCServiceException {
         List<MusicRangeInformationRow> rows = new ArrayList<>();
         List<Range> range1 = new ArrayList<>( Arrays.asList(
-           new Range("range1")
+           new Range("schema.range1")
         ));
         List<Range> range2 = new ArrayList<>( Arrays.asList(
-           new Range("range2")
+           new Range("schema.range2")
         ));
         List<Range> ranges = new ArrayList<>( Arrays.asList(
-            new Range("range2"),
-            new Range("range1")
+            new Range("schema.range2"),
+            new Range("schema.range1")
         ));
         rows.add(createNewRow(new ArrayList<>(range1),"",false));
         MILLISECONDS.sleep(10);
@@ -123,7 +123,7 @@ public class DagTest {
     public void nextToOwn() throws InterruptedException, MDBCServiceException {
         List<MusicRangeInformationRow> rows = new ArrayList<>();
         List<Range> ranges = new ArrayList<>( Arrays.asList(
-            new Range("range1")
+            new Range("schema.range1")
         ));
         rows.add(createNewRow(new ArrayList<>(ranges),"",false));
         MILLISECONDS.sleep(10);
@@ -149,20 +149,20 @@ public class DagTest {
     public void nextToApply() throws InterruptedException {
         List<MusicRangeInformationRow> rows = new ArrayList<>();
         List<Range> ranges = new ArrayList<>( Arrays.asList(
-            new Range("range1")
+            new Range("schema.range1")
         ));
         List<MusicTxDigestId> redo1 = new ArrayList<>(Arrays.asList(
-            new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0)
+            new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
         ));
         rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo1));
         MILLISECONDS.sleep(10);
         List<MusicTxDigestId> redo2 = new ArrayList<>(Arrays.asList(
-            new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0)
+            new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
         ));
         rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo2));
         MILLISECONDS.sleep(10);
         List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList(
-            new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0)
+            new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
         ));
         rows.add(createNewRow(new ArrayList<>(ranges),"",true,redo3));
         Dag dag = Dag.getDag(rows, ranges);
@@ -180,7 +180,7 @@ public class DagTest {
                 assertEquals(0,pair.getKey().index);
                 List<Range> value = pair.getValue();
                 assertEquals(1,value.size());
-                assertEquals(new Range("range1"),value.get(0));
+                assertEquals(new Range("schema.range1"),value.get(0));
                 pair = node.nextNotAppliedTransaction(rangesSet);
                 transactionCounter++;
             }
@@ -195,23 +195,23 @@ public class DagTest {
         Map<Range, Pair<MriReference, Integer>> alreadyApplied = new HashMap<>();
         List<MusicRangeInformationRow> rows = new ArrayList<>();
         List<Range> ranges = new ArrayList<>( Arrays.asList(
-            new Range("range1")
+            new Range("schema.range1")
         ));
         List<MusicTxDigestId> redo1 = new ArrayList<>(Arrays.asList(
-            new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0)
+            new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
         ));
         rows.add(createNewRow(new ArrayList<>(ranges),"",false,redo1));
         MILLISECONDS.sleep(10);
         List<MusicTxDigestId> redo2 = new ArrayList<>(Arrays.asList(
-            new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0),
-            new MusicTxDigestId(MDBCUtils.generateUniqueKey(),1)
+            new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0),
+            new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),1)
         ));
         MusicRangeInformationRow newRow = createNewRow(new ArrayList<>(ranges), "", false, redo2);
-        alreadyApplied.put(new Range("range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0));
+        alreadyApplied.put(new Range("schema.range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0));
         rows.add(newRow);
         MILLISECONDS.sleep(10);
         List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList(
-            new MusicTxDigestId(MDBCUtils.generateUniqueKey(),0)
+            new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),0)
         ));
         rows.add(createNewRow(new ArrayList<>(ranges),"",true,redo3));
         Dag dag = Dag.getDag(rows, ranges);
@@ -230,7 +230,7 @@ public class DagTest {
                 assertEquals(2-nodeCounter,pair.getKey().index);
                 List<Range> value = pair.getValue();
                 assertEquals(1,value.size());
-                assertEquals(new Range("range1"),value.get(0));
+                assertEquals(new Range("schema.range1"),value.get(0));
                 pair = node.nextNotAppliedTransaction(rangesSet);
                 transactionCounter++;
             }
@@ -244,14 +244,14 @@ public class DagTest {
     public void isDifferent() throws InterruptedException {
         List<MusicRangeInformationRow> rows = new ArrayList<>();
         List<Range> range1 = new ArrayList<>( Arrays.asList(
-            new Range("range1")
+            new Range("schema.range1")
         ));
         List<Range> range2 = new ArrayList<>( Arrays.asList(
-            new Range("range2")
+            new Range("schema.range2")
         ));
         List<Range> ranges = new ArrayList<>( Arrays.asList(
-            new Range("range2"),
-            new Range("range1")
+            new Range("schema.range2"),
+            new Range("schema.range1")
         ));
         rows.add(createNewRow(new ArrayList<>(range1),"",false));
         MILLISECONDS.sleep(10);
@@ -277,14 +277,14 @@ public class DagTest {
     public void getOldestDoubles() throws InterruptedException, MDBCServiceException {
         List<MusicRangeInformationRow> rows = new ArrayList<>();
         List<Range> range1 = new ArrayList<>( Arrays.asList(
-            new Range("range1")
+            new Range("schema.range1")
         ));
         List<Range> range2 = new ArrayList<>( Arrays.asList(
-            new Range("range2")
+            new Range("schema.range2")
         ));
         List<Range> ranges = new ArrayList<>( Arrays.asList(
-            new Range("range2"),
-            new Range("range1")
+            new Range("schema.range2"),
+            new Range("schema.range1")
         ));
         rows.add(createNewRow(new ArrayList<>(range1),"",false));
         MILLISECONDS.sleep(10);
@@ -306,15 +306,15 @@ public class DagTest {
     public void getIncompleteRangesAndDependents() throws InterruptedException, MDBCServiceException {
                 List<MusicRangeInformationRow> rows = new ArrayList<>();
         List<Range> range1 = new ArrayList<>( Arrays.asList(
-            new Range("range1")
+            new Range("schema.range1")
         ));
         List<Range> range2 = new ArrayList<>( Arrays.asList(
-            new Range("range2"),
-            new Range("range3")
+            new Range("schema.range2"),
+            new Range("schema.range3")
         ));
         List<Range> ranges = new ArrayList<>( Arrays.asList(
-            new Range("range2"),
-            new Range("range1")
+            new Range("schema.range2"),
+            new Range("schema.range1")
         ));
         rows.add(createNewRow(new ArrayList<>(range1),"",false));
         MILLISECONDS.sleep(10);
@@ -330,7 +330,7 @@ public class DagTest {
         List<Range> incomplete = incompleteRangesAndDependents.getKey();
         Set<DagNode> dependents = incompleteRangesAndDependents.getValue();
         assertEquals(1,incomplete.size());
-        assertTrue(incomplete.contains(new Range("range3")));
+        assertTrue(incomplete.contains(new Range("schema.range3")));
         assertEquals(1,dependents.size());
         assertTrue(dependents.contains(dag.getNode(rows.get(3).getPartitionIndex())));
     }
@@ -339,16 +339,16 @@ public class DagTest {
     public void getIncompleteRangesAndDependents2() throws InterruptedException, MDBCServiceException {
         List<MusicRangeInformationRow> rows = new ArrayList<>();
         List<Range> range1 = new ArrayList<>( Arrays.asList(
-            new Range("range1"),
-            new Range("range4")
+            new Range("schema.range1"),
+            new Range("schema.range4")
         ));
         List<Range> range2 = new ArrayList<>( Arrays.asList(
-            new Range("range2"),
-            new Range("range3")
+            new Range("schema.range2"),
+            new Range("schema.range3")
         ));
         List<Range> ranges = new ArrayList<>( Arrays.asList(
-            new Range("range2"),
-            new Range("range1")
+            new Range("schema.range2"),
+            new Range("schema.range1")
         ));
         rows.add(createNewRow(new ArrayList<>(range1),"",false));
         MILLISECONDS.sleep(10);
@@ -364,8 +364,8 @@ public class DagTest {
         List<Range> incomplete = incompleteRangesAndDependents.getKey();
         Set<DagNode> dependents = incompleteRangesAndDependents.getValue();
         assertEquals(2,incomplete.size());
-        assertTrue(incomplete.contains(new Range("range3")));
-        assertTrue(incomplete.contains(new Range("range4")));
+        assertTrue(incomplete.contains(new Range("schema.range3")));
+        assertTrue(incomplete.contains(new Range("schema.range4")));
         assertEquals(2,dependents.size());
         assertTrue(dependents.contains(dag.getNode(rows.get(3).getPartitionIndex())));
         assertTrue(dependents.contains(dag.getNode(rows.get(2).getPartitionIndex())));
@@ -375,20 +375,20 @@ public class DagTest {
     public void addNewNodeWithSearch() throws InterruptedException, MDBCServiceException {
         List<MusicRangeInformationRow> rows = new ArrayList<>();
         List<Range> range1 = new ArrayList<>( Arrays.asList(
-            new Range("range1")
+            new Range("schema.range1")
         ));
         List<Range> range2 = new ArrayList<>( Arrays.asList(
-            new Range("range2"),
-            new Range("range3")
+            new Range("schema.range2"),
+            new Range("schema.range3")
         ));
         List<Range> ranges = new ArrayList<>( Arrays.asList(
-            new Range("range2"),
-            new Range("range1")
+            new Range("schema.range2"),
+            new Range("schema.range1")
         ));
         List<Range> allRanges = new ArrayList<>( Arrays.asList(
-            new Range("range2"),
-            new Range("range3"),
-            new Range("range1")
+            new Range("schema.range2"),
+            new Range("schema.range3"),
+            new Range("schema.range1")
         ));
         rows.add(createNewRow(new ArrayList<>(range1),"",false));
         MILLISECONDS.sleep(10);
index eb01bcd..f2fbd1f 100644 (file)
@@ -34,7 +34,6 @@ import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.Mockito;
@@ -46,8 +45,10 @@ import org.onap.music.exceptions.MusicServiceException;
 import org.onap.music.lockingservice.cassandra.CassaLockStore;
 import org.onap.music.mdbc.DatabasePartition;
 import org.onap.music.mdbc.MDBCUtils;
+import org.onap.music.mdbc.MdbcTestUtils.DBType;
 import org.onap.music.mdbc.Range;
 import org.onap.music.mdbc.StateManager;
+import org.onap.music.mdbc.MdbcTestUtils;
 import org.onap.music.mdbc.TestUtils;
 import org.onap.music.mdbc.mixins.LockResult;
 import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
@@ -58,13 +59,8 @@ import org.onap.music.mdbc.tables.StagingTable;
 import org.onap.music.mdbc.tables.TxCommitProgress;
 
 public class OwnershipAndCheckpointTest {
-    final private static int sqlPort = 13350;
-    final private static String keyspace="metricmusictest";
-    final private static String mriTableName = "musicrangeinformation";
-    final private static String mtdTableName = "musictxdigest";
-    final private static String mdbcServerName = "name";
-    public static final String DATABASE = "mdbcTest";
-       public static final String TABLE= "PERSONS";
+    public static final String DATABASE = MdbcTestUtils.mariaDBDatabaseName;
+       public static final String TABLE= MdbcTestUtils.mariaDBDatabaseName+".PERSONS";
        public static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS " + TABLE + " (\n" +
             "    PersonID int,\n" +
             "    LastName varchar(255),\n" +
@@ -75,11 +71,7 @@ public class OwnershipAndCheckpointTest {
             ");";
        public static final String DROP_TABLE = "DROP TABLE IF EXISTS " + TABLE + ";";
     //Properties used to connect to music
-    private static Cluster cluster;
-    private static Session session;
-    private static String cassaHost = "localhost";
     private static MusicMixin musicMixin = null;
-    private static DB db;
        Connection conn;
        MySQLMixin mysqlMixin;
        OwnershipAndCheckpoint ownAndCheck;
@@ -90,37 +82,18 @@ public class OwnershipAndCheckpointTest {
 
     @BeforeClass
     public static void init() throws MusicServiceException, ClassNotFoundException, ManagedProcessException {
-        try {
-            EmbeddedCassandraServerHelper.startEmbeddedCassandra();
-        } catch (Exception e) {
-            fail(e.getMessage());
-        }
-        cluster=EmbeddedCassandraServerHelper.getCluster();
-        //cluster = new Cluster.Builder().addContactPoint(cassaHost).withPort(9142).build();
-        cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(20000);
-        assertNotNull("Invalid configuration for cassandra", cluster);
-        session = EmbeddedCassandraServerHelper.getSession();
-        assertNotNull("Invalid configuration for cassandra", session);
+        MdbcTestUtils.initCassandra();
         Class.forName("org.mariadb.jdbc.Driver");
-        MusicDataStoreHandle.mDstoreHandle = new MusicDataStore(cluster, session);
-        CassaLockStore store = new CassaLockStore(MusicDataStoreHandle.mDstoreHandle);
-        assertNotNull("Invalid configuration for music", store);
                //start embedded mariadb
-               db = DB.newEmbeddedDB(sqlPort);
-               db.start();
-               db.createDB(DATABASE);
+        MdbcTestUtils.startMariaDb();
     }
 
     @AfterClass
     public static void close() throws MusicServiceException, MusicQueryException, ManagedProcessException {
         //TODO: shutdown cassandra
         musicMixin=null;
-        db.stop();
-        try {
-            EmbeddedCassandraServerHelper.cleanEmbeddedCassandra();
-        }
-        catch(NullPointerException e){
-        }
+        MdbcTestUtils.cleanDatabase(DBType.MySQL);
+        MdbcTestUtils.stopCassandra();
     }
 
     private void dropTable() throws SQLException {
@@ -144,25 +117,34 @@ public class OwnershipAndCheckpointTest {
 
     @Before
     public void initTest() throws SQLException {
-        session.execute("DROP KEYSPACE IF EXISTS "+keyspace);
+        MdbcTestUtils.getSession().execute("DROP KEYSPACE IF EXISTS "+MdbcTestUtils.getKeyspace());
         try {
             Properties properties = new Properties();
+/*
             properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName);
             properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace);
             properties.setProperty(MusicMixin.KEY_MUSIC_RFACTOR,"1");
             //StateManager stateManager = new StateManager("dbUrl", properties, "serverName", "dbName");
             ownAndCheck = new OwnershipAndCheckpoint();
             musicMixin =new MusicMixin(stateManager, mdbcServerName,properties);
+*/
+            properties.setProperty(MusicMixin.KEY_MY_ID,MdbcTestUtils.getServerName());
+            properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,MdbcTestUtils.getKeyspace());
+            properties.setProperty(MusicMixin.KEY_MUSIC_RFACTOR,"1");
+            properties.setProperty(MusicMixin.KEY_MUSIC_ADDRESS,MdbcTestUtils.getCassandraUrl());
+            ownAndCheck = new OwnershipAndCheckpoint();
+            musicMixin =new MusicMixin(stateManager, MdbcTestUtils.getServerName(), properties);
         } catch (MDBCServiceException e) {
             fail("error creating music musicMixin " + e.getMessage());
         }
-        this.conn = DriverManager.getConnection("jdbc:mariadb://localhost:"+sqlPort+"/"+DATABASE, "root", "");
-        this.mysqlMixin = new MySQLMixin(musicMixin, "localhost:"+sqlPort+"/"+DATABASE, conn, null);
+        this.conn = MdbcTestUtils.getConnection(DBType.MySQL);
+        Properties info = new Properties();
+        this.mysqlMixin = new MySQLMixin(musicMixin, "localhost:"+MdbcTestUtils.getMariaDbPort()+"/"+DATABASE, conn, info);
         dropAndCreateTable();
     }
 
     private void initDatabase(Range range) throws MDBCServiceException, SQLException {
-        final DatabasePartition partition = TestUtils.createBasicRow(range, musicMixin, mdbcServerName);
+        final DatabasePartition partition = TestUtils.createBasicRow(range, musicMixin, MdbcTestUtils.getServerName());
         String sqlOperation = "INSERT INTO "+TABLE+" (PersonID,LastName,FirstName,Address,City) VALUES "+
             "(1,'SAUREZ','ENRIQUE','GATECH','ATLANTA');";
         StagingTable stagingTable = new StagingTable();
@@ -171,13 +153,15 @@ public class OwnershipAndCheckpointTest {
         executeStatement.execute(sqlOperation);
         this.conn.commit();
         mysqlMixin.postStatementHook(sqlOperation,stagingTable);
+        mysqlMixin.preCommitHook();
         executeStatement.close();
         String id = MDBCUtils.generateUniqueKey().toString();
         TxCommitProgress progressKeeper = new TxCommitProgress();
         progressKeeper.createNewTransactionTracker(id ,this.conn);
         musicMixin.commitLog(partition, null, stagingTable, id, progressKeeper);
         try {
-            TestUtils.unlockRow(keyspace, mriTableName, partition);
+//            TestUtils.unlockRow(keyspace, mriTableName, partition);
+            TestUtils.unlockRow(MdbcTestUtils.getKeyspace(), MdbcTestUtils.getMriTableName(), partition);
         }
         catch(Exception e){
             fail(e.getMessage());