Finish adding METRIC code 89/70689/1
authorTschaen, Brendan <ctschaen@att.com>
Wed, 17 Oct 2018 21:26:32 +0000 (17:26 -0400)
committerTschaen, Brendan <ctschaen@att.com>
Wed, 17 Oct 2018 21:26:52 +0000 (17:26 -0400)
Change-Id: Ifd0307ac21f85e504d690c79080174a50af87f9e
Issue-ID: MUSIC-149
Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
30 files changed:
.gitreview [new file with mode: 0644]
src/main/java/com/att/research/mdbc/ArchiveProcess.java
src/main/java/com/att/research/mdbc/DatabaseOperations.java
src/main/java/com/att/research/mdbc/DatabasePartition.java
src/main/java/com/att/research/mdbc/MDBCUtils.java
src/main/java/com/att/research/mdbc/MdbcConnection.java
src/main/java/com/att/research/mdbc/MusicSqlManager.java
src/main/java/com/att/research/mdbc/StateManager.java
src/main/java/com/att/research/mdbc/configurations/NodeConfiguration.java
src/main/java/com/att/research/mdbc/configurations/TablesConfiguration.java
src/main/java/com/att/research/mdbc/configurations/config-0.json
src/main/java/com/att/research/mdbc/configurations/ranges.json
src/main/java/com/att/research/mdbc/configurations/tableConfiguration.json
src/main/java/com/att/research/mdbc/mixins/CassandraMixin.java
src/main/java/com/att/research/mdbc/mixins/DBInterface.java
src/main/java/com/att/research/mdbc/mixins/MusicInterface.java
src/main/java/com/att/research/mdbc/mixins/MusicMixin.java
src/main/java/com/att/research/mdbc/mixins/MySQLMixin.java
src/main/java/com/att/research/mdbc/tables/Operation.java [new file with mode: 0644]
src/main/java/com/att/research/mdbc/tables/OperationType.java [new file with mode: 0644]
src/main/java/com/att/research/mdbc/tables/PartitionInformation.java [new file with mode: 0644]
src/main/java/com/att/research/mdbc/tables/RedoHistoryElement.java [new file with mode: 0644]
src/main/java/com/att/research/mdbc/tables/RedoRecordId.java [new file with mode: 0644]
src/main/java/com/att/research/mdbc/tables/StagingTable.java [new file with mode: 0644]
src/main/java/com/att/research/mdbc/tables/TablePartitionInformation.java [new file with mode: 0644]
src/main/java/com/att/research/mdbc/tables/TitReference.java [new file with mode: 0644]
src/main/java/com/att/research/mdbc/tables/TransactionInformationElement.java [new file with mode: 0644]
src/main/java/com/att/research/mdbc/tables/TxCommitProgress.java [new file with mode: 0644]
src/main/java/com/att/research/mdbc/tools/CreatePartition.java
src/test/java/com/att/research/mdbc/MDBCUtilsTest.java

diff --git a/.gitreview b/.gitreview
new file mode 100644 (file)
index 0000000..e67571d
--- /dev/null
@@ -0,0 +1,4 @@
+[gerrit]
+host=gerrit.onap.org
+port=29418
+project=music/mdbc.git
index f192430..8290d66 100644 (file)
@@ -12,10 +12,9 @@ public class ArchiveProcess {
        //TODO: This is a place holder for taking snapshots and moving data from redo record into actual tables
        
        /**
-        * This method is called whenever there is a DELETE on a local SQL table, and should be called by the underlying databases
-        * triggering mechanism. It updates the MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL DELETE.
-        * Music propagates it to the other replicas.  If the local database is in the middle of a transaction, the DELETEs to MUSIC are
-        * delayed until the transaction is either committed or rolled back.
+        * This method is called whenever there is a DELETE on the transaction digest and should be called when ownership changes, if required
+        *  It updates the MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL DELETE.
+        * Music propagates it to the other replicas.
         * @param tableName This is the table on which the select is being performed
         * @param oldRow This is information about the row that is being deleted
         */
@@ -26,8 +25,8 @@ public class ArchiveProcess {
        }
        
        /**
-        * This method is called whenever there is an INSERT or UPDATE to a local SQL table, and should be called by the underlying databases
-        * triggering mechanism. It updates the MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write.
+        * This method is called whenever there is an INSERT or UPDATE to a the transaction digest, and should be called by an
+        * ownership chance. It updates the MUSIC/Cassandra tables (both dirty bits and actual data) corresponding to the SQL write.
         * Music propagates it to the other replicas.  If the local database is in the middle of a transaction, the updates to MUSIC are
         * delayed until the transaction is either committed or rolled back.
         *
index 406152e..bd10928 100644 (file)
@@ -263,64 +263,9 @@ public class DatabaseOperations {
         return id;
     }
 
-       /**
-        * This function creates the TransactionInformation table. It contain information related
-        * to the transactions happening in a given partition.
-        *       * The schema of the table is
-        *              * Id, uiid.
-        *              * Partition, uuid id of the partition
-        *              * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables
-        *              * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables
-        *              * Redo: list of uiids associated to the Redo Records Table
-        *
-        */
-       public static void CreateTransactionInformationTable( String musicNamespace, String transactionInformationTableName) throws MDBCServiceException {
-               String tableName = transactionInformationTableName;
-               String priKey = "id";
-               StringBuilder fields = new StringBuilder();
-               fields.append("id uuid, ");
-               fields.append("partition uuid, ");
-               fields.append("latestapplied int, ");
-               fields.append("applied boolean, ");
-               //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
-               fields.append("redo list<frozen<tuple<text,tuple<text,varint>>>> ");
-               String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
-        try {
-            executeMusicWriteQuery(musicNamespace,tableName,cql);
-        } catch (MDBCServiceException e) {
-            logger.error("Initialization error: Failure to create transaction information table");
-            throw(e);
-        }
-    }
 
-       /**
-        * This function creates the RedoRecords table. It contain information related to each transaction committed
-        *      * LeaseId: id associated with the lease, text
-        *      * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
-        *  * TransactionDigest: text that contains all the changes in the transaction
-        */
-       public static void CreateRedoRecordsTable(int redoTableNumber, String musicNamespace, String redoRecordTableName) throws MDBCServiceException {
-               String tableName = redoRecordTableName;
-               if(redoTableNumber >= 0) {
-                       StringBuilder table = new StringBuilder();
-                       table.append(tableName);
-                       table.append("-");
-                       table.append(Integer.toString(redoTableNumber));
-                       tableName=table.toString();
-               }
-               String priKey = "leaseid,leasecounter";
-               StringBuilder fields = new StringBuilder();
-               fields.append("leaseid text, ");
-               fields.append("leasecounter varint, ");
-               fields.append("transactiondigest text ");//notice lack of ','
-               String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
-        try {
-            executeMusicWriteQuery(musicNamespace,tableName,cql);
-        } catch (MDBCServiceException e) {
-            logger.error("Initialization error: Failure to create redo records table");
-            throw(e);
-        }
-    }
+
+
 
        /**
         * This function creates the Table To Partition table. It contain information related to
@@ -440,4 +385,65 @@ public class DatabaseOperations {
             }
         }
     }
+
+    /**
+     * This function creates the MusicTxDigest table. It contain information related to each transaction committed
+     *         * LeaseId: id associated with the lease, text
+     *         * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
+     *  * TransactionDigest: text that contains all the changes in the transaction
+     */
+    public static void CreateMusicTxDigest(int musicTxDigestTableNumber, String musicNamespace, String musicTxDigestTableName) throws MDBCServiceException {
+        String tableName = musicTxDigestTableName;
+        if(musicTxDigestTableNumber >= 0) {
+            StringBuilder table = new StringBuilder();
+            table.append(tableName);
+            table.append("-");
+            table.append(Integer.toString(musicTxDigestTableNumber));
+            tableName=table.toString();
+        }
+        String priKey = "leaseid,leasecounter";
+        StringBuilder fields = new StringBuilder();
+        fields.append("leaseid text, ");
+        fields.append("leasecounter varint, ");
+        fields.append("transactiondigest text ");//notice lack of ','
+        String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
+        try {
+            executeMusicWriteQuery(musicNamespace,tableName,cql);
+        } catch (MDBCServiceException e) {
+            logger.error("Initialization error: Failure to create redo records table");
+            throw(e);
+        }
+    }
+
+    /**
+     * This function creates the TransactionInformation table. It contain information related
+     * to the transactions happening in a given partition.
+     *          * The schema of the table is
+     *                 * Id, uiid.
+     *                 * Partition, uuid id of the partition
+     *                 * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables
+     *         * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables
+     *         * Redo: list of uiids associated to the Redo Records Table
+     *
+     */
+    public static void CreateMusicRangeInformationTable(String musicNamespace, String musicRangeInformationTableName) throws MDBCServiceException {
+        String tableName = musicRangeInformationTableName;
+        String priKey = "id";
+        StringBuilder fields = new StringBuilder();
+        fields.append("id uuid, ");
+        fields.append("partition uuid, ");
+        fields.append("latestapplied int, ");
+        fields.append("applied boolean, ");
+        //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
+        fields.append("redo list<frozen<tuple<text,tuple<text,varint>>>> ");
+        String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
+        try {
+            executeMusicWriteQuery(musicNamespace,tableName,cql);
+        } catch (MDBCServiceException e) {
+            logger.error("Initialization error: Failure to create transaction information table");
+            throw(e);
+        }
+    }
+
+
 }
index 6046801..a9b4f3e 100644 (file)
@@ -7,7 +7,6 @@ import java.util.HashSet;
 import java.util.Set;
 
 import com.att.research.logging.EELFLoggerDelegate;
-import com.att.research.mdbc.mixins.CassandraMixin;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 
@@ -19,9 +18,9 @@ import com.google.gson.GsonBuilder;
 public class DatabasePartition {
        private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(DatabasePartition.class);
 
-       private String transactionInformationTable;//Table that currently contains the REDO log for this partition
-       private String transactionInformationIndex;//Index that can be obtained either from
-       private String redoRecordsTable;
+       private String musicRangeInformationTable;//Table that currently contains the REDO log for this partition
+       private String musicRangeInformationIndex;//Index that can be obtained either from
+       private String musicTxDigestTable;
        private String partitionId;
        private String lockId;
        protected Set<Range> ranges;
@@ -35,7 +34,7 @@ public class DatabasePartition {
                ranges = new HashSet<>();
        }
        
-       public DatabasePartition(Set<Range> knownRanges, String titIndex, String titTable, String partitionId, String lockId, String redoRecordsTable) {
+       public DatabasePartition(Set<Range> knownRanges, String mriIndex, String mriTable, String partitionId, String lockId, String musicTxDigestTable) {
                if(knownRanges != null) {
                        ranges = knownRanges;
                }
@@ -43,25 +42,25 @@ public class DatabasePartition {
                        ranges = new HashSet<>();
                }
 
-               if(redoRecordsTable != null) {
-            this.setRedoRecordsTable(redoRecordsTable);
+               if(musicTxDigestTable != null) {
+            this.setMusicTxDigestTable(musicTxDigestTable);
         }
         else{
-            this.setRedoRecordsTable("");
+            this.setMusicTxDigestTable("");
         }
 
-               if(titIndex != null) {
-                       this.setTransactionInformationIndex(titIndex);
+               if(mriIndex != null) {
+                       this.setMusicRangeInformationIndex(mriIndex);
                }
                else {
-                       this.setTransactionInformationIndex("");
+                       this.setMusicRangeInformationIndex("");
                }
                
-               if(titTable != null) {
-                       this.setTransactionInformationTable(titTable);
+               if(mriTable != null) {
+                       this.setMusicRangeInformationTable(mriTable);
                }
                else {
-                       this.setTransactionInformationTable("");
+                       this.setMusicRangeInformationTable("");
                }
                
                if(partitionId != null) {
@@ -79,20 +78,20 @@ public class DatabasePartition {
                }       
        }
 
-       public String getTransactionInformationTable() {
-               return transactionInformationTable;
+       public String getMusicRangeInformationTable() {
+               return musicRangeInformationTable;
        }
 
-       public void setTransactionInformationTable(String transactionInformationTable) {
-               this.transactionInformationTable = transactionInformationTable;
+       public void setMusicRangeInformationTable(String musicRangeInformationTable) {
+               this.musicRangeInformationTable = musicRangeInformationTable;
        }
 
-       public String getTransactionInformationIndex() {
-               return transactionInformationIndex;
+       public String getMusicRangeInformationIndex() {
+               return musicRangeInformationIndex;
        }
 
-       public void setTransactionInformationIndex(String transactionInformationIndex) {
-               this.transactionInformationIndex = transactionInformationIndex;
+       public void setMusicRangeInformationIndex(String musicRangeInformationIndex) {
+               this.musicRangeInformationIndex = musicRangeInformationIndex;
        }
 
        /**
@@ -180,11 +179,11 @@ public class DatabasePartition {
                this.lockId = lockId;
        }
 
-    public String getRedoRecordsTable() {
-        return redoRecordsTable;
+    public String getMusicTxDigestTable() {
+        return musicTxDigestTable;
     }
 
-    public void setRedoRecordsTable(String redoRecordsTable) {
-        this.redoRecordsTable = redoRecordsTable;
+    public void setMusicTxDigestTable(String musicTxDigestTable) {
+        this.musicTxDigestTable = musicTxDigestTable;
     }
 }
index 411be8d..34f4b10 100644 (file)
@@ -2,26 +2,23 @@ package com.att.research.mdbc;
 
 import java.io.*;
 import java.util.Base64;
+import java.util.Deque;
+import java.util.HashMap;
 
 import com.att.research.logging.EELFLoggerDelegate;
 import com.att.research.logging.format.AppMessages;
 import com.att.research.logging.format.ErrorSeverity;
 import com.att.research.logging.format.ErrorTypes;
+import com.att.research.mdbc.tables.Operation;
+import com.att.research.mdbc.tables.StagingTable;
+
+import javassist.bytecode.Descriptor.Iterator;
+
+import org.apache.commons.lang3.tuple.Pair;
 import org.json.JSONObject;
 
 public class MDBCUtils {
-    /** Read the object from Base64 string. */
-   public static Object fromString( String s ) throws IOException ,
-                                                       ClassNotFoundException {
-        byte [] data = Base64.getDecoder().decode( s );
-        ObjectInputStream ois = new ObjectInputStream( 
-                                        new ByteArrayInputStream(  data ) );
-        Object o  = ois.readObject();
-        ois.close();
-        return o;
-   }
-
-    /** Write the object to a Base64 string. */
+        /** Write the object to a Base64 string. */
     public static String toString( Serializable o ) throws IOException {
        //TODO We may want to also compress beside serialize
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -44,6 +41,17 @@ public class MDBCUtils {
         oos.close();
         return Base64.getEncoder().encodeToString(baos.toByteArray());
     }
+    
+    /** Read the object from Base64 string. */
+    public static Object fromString( String s ) throws IOException ,
+                                                        ClassNotFoundException {
+         byte [] data = Base64.getDecoder().decode( s );
+         ObjectInputStream ois = new ObjectInputStream( 
+                                         new ByteArrayInputStream(  data ) );
+         Object o  = ois.readObject();
+         ois.close();
+         return o;
+    }
 
     public static void saveToFile(String serializedContent, String filename, EELFLoggerDelegate logger) throws IOException {
         try (PrintWriter fout = new PrintWriter(filename)) {
index d471522..1e845fd 100644 (file)
@@ -26,7 +26,7 @@ import com.att.research.logging.format.AppMessages;
 import com.att.research.logging.format.ErrorSeverity;
 import com.att.research.logging.format.ErrorTypes;
 import com.att.research.mdbc.mixins.MusicInterface;
-import com.att.research.mdbc.mixins.TxCommitProgress;
+import com.att.research.mdbc.tables.TxCommitProgress;
 
 
 /**
index 4330cfe..e32a969 100755 (executable)
@@ -1,22 +1,16 @@
 package com.att.research.mdbc;
 
 import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Properties;
-import java.util.Set;
+import java.util.*;
 
 import org.json.JSONObject;
 
 import com.att.research.mdbc.mixins.DBInterface;
 import com.att.research.mdbc.mixins.MixinFactory;
 import com.att.research.mdbc.mixins.MusicInterface;
-import com.att.research.mdbc.mixins.StagingTable;
-import com.att.research.mdbc.mixins.TxCommitProgress;
 import com.att.research.mdbc.mixins.Utils;
-
+import com.att.research.mdbc.tables.StagingTable;
+import com.att.research.mdbc.tables.TxCommitProgress;
 import com.att.research.exceptions.MDBCServiceException;
 import com.att.research.exceptions.QueryException;
 import com.att.research.logging.*;
@@ -41,117 +35,121 @@ import com.att.research.logging.format.ErrorTypes;
 */
 public class MusicSqlManager {
 
-       private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicSqlManager.class);
-       
-       private final DBInterface dbi;
-       private final MusicInterface mi;
-       private final Set<String> table_set;
-       private final HashMap<Range,StagingTable> transactionDigest;
-       private boolean autocommit;                     // a copy of the autocommit flag from the JDBC Connection
+    private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicSqlManager.class);
 
-       /**
-        * Build a MusicSqlManager for a DB connection.  This construct may only be called by getMusicSqlManager(),
-        * which will ensure that only one MusicSqlManager is created per URL.
-        * This is the location where the appropriate mixins to use for the MusicSqlManager should be determined.
-        * They should be picked based upon the URL and the properties passed to this constructor.
-        * <p>
-        * At the present time, we only support the use of the H2Mixin (for access to a local H2 database),
-        * with the CassandraMixin (for direct access to a Cassandra noSQL DB as the persistence layer).
-        * </p>
-        *
-        * @param url the JDBC URL which was used to connection to the database
-        * @param conn the actual connection to the database
-        * @param info properties passed from the initial JDBC connect() call
-        * @throws MDBCServiceException 
-        */
-        public MusicSqlManager(String url, Connection conn, Properties info, MusicInterface mi) throws MDBCServiceException {
-               try {
-                       info.putAll(Utils.getMdbcProperties());
-                       String mixinDb  = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT);
-                       this.dbi       = MixinFactory.createDBInterface(mixinDb, this, url, conn, info);
-                       this.mi = mi;
-                       this.table_set = Collections.synchronizedSet(new HashSet<String>());
-                       this.autocommit = true;
-                       this.transactionDigest = new HashMap<Range,StagingTable>();
+    private final DBInterface dbi;
+    private final MusicInterface mi;
+    private final Set<String> table_set;
+    private final HashMap<Range, StagingTable> transactionDigest;
+    private boolean autocommit;            // a copy of the autocommit flag from the JDBC Connection
 
-               }catch(Exception e) {
-                       throw new MDBCServiceException(e.getMessage());
-               }
-       }
+    /**
+     * Build a MusicSqlManager for a DB connection.  This construct may only be called by getMusicSqlManager(),
+     * which will ensure that only one MusicSqlManager is created per URL.
+     * This is the location where the appropriate mixins to use for the MusicSqlManager should be determined.
+     * They should be picked based upon the URL and the properties passed to this constructor.
+     * <p>
+     * At the present time, we only support the use of the H2Mixin (for access to a local H2 database),
+     * with the CassandraMixin (for direct access to a Cassandra noSQL DB as the persistence layer).
+     * </p>
+     *
+     * @param url  the JDBC URL which was used to connection to the database
+     * @param conn the actual connection to the database
+     * @param info properties passed from the initial JDBC connect() call
+     * @throws MDBCServiceException
+     */
+    public MusicSqlManager(String url, Connection conn, Properties info, MusicInterface mi) throws MDBCServiceException {
+        try {
+            info.putAll(Utils.getMdbcProperties());
+            String mixinDb = info.getProperty(Configuration.KEY_DB_MIXIN_NAME, Configuration.DB_MIXIN_DEFAULT);
+            this.dbi = MixinFactory.createDBInterface(mixinDb, this, url, conn, info);
+            this.mi = mi;
+            this.table_set = Collections.synchronizedSet(new HashSet<String>());
+            this.autocommit = true;
+            this.transactionDigest = new HashMap<Range, StagingTable>();
 
-       public void setAutoCommit(boolean b,String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException {
-               if (b != autocommit) {
-                       autocommit = b;
-                       logger.debug(EELFLoggerDelegate.applicationLogger,"autocommit changed to "+b);
-                       if (b) {
-                               // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction
-                               if(txId == null || txId.isEmpty()) {
-                                       logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null",AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
-                                       throw new MDBCServiceException("tx id is null");
-                               }
-                               commit(txId,progressKeeper,partition);
-                       }
-               }
-       }
+        } catch (Exception e) {
+            throw new MDBCServiceException(e.getMessage());
+        }
+    }
 
-       /**
-        * Close this MusicSqlManager.
-        */
-       public void close() {
-               if (dbi != null) {
-                       dbi.close();
-               }
-       }
+    public void setAutoCommit(boolean b, String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException {
+        if (b != autocommit) {
+            autocommit = b;
+            logger.debug(EELFLoggerDelegate.applicationLogger, "autocommit changed to " + b);
+            if (b) {
+                // My reading is that turning autoCOmmit ON should automatically commit any outstanding transaction
+                if (txId == null || txId.isEmpty()) {
+                    logger.error(EELFLoggerDelegate.errorLogger, "Connection ID is null", AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+                    throw new MDBCServiceException("tx id is null");
+                }
+                commit(txId, progressKeeper, partition);
+            }
+        }
+    }
 
-       /**
-        * Code to be run within the DB driver before a SQL statement is executed.  This is where tables
-        * can be synchronized before a SELECT, for those databases that do not support SELECT triggers.
-        * @param sql the SQL statement that is about to be executed
-        */
-       public void preStatementHook(final String sql) {
-               dbi.preStatementHook(sql);
-       }
-       /**
-        * Code to be run within the DB driver after a SQL statement has been executed.  This is where remote
-        * statement actions can be copied back to Cassandra/MUSIC.
-        * @param sql the SQL statement that was executed
-        */
-       public void postStatementHook(final String sql) {
-               dbi.postStatementHook(sql,transactionDigest);
-       }
-       /**
-        * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the
-        * proxy first starts, and whenever there is the possibility that tables were created or dropped.  It is synchronized
-        * in order to prevent multiple threads from running this code in parallel.
-        */
-       public synchronized void synchronizeTables() throws QueryException {
-                       Set<String> set1 = dbi.getSQLTableSet();        // set of tables in the database
-                       logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1);
-                       for (String tableName : set1) {
-                               // This map will be filled in if this table was previously discovered
-                               if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) {
-                                       logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: "+tableName);
-                                       try {
-                                               TableInfo ti = dbi.getTableInfo(tableName);
-                                               mi.initializeMusicForTable(ti,tableName);
-                                               //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted
-                                               ti = dbi.getTableInfo(tableName);
-                                               mi.createDirtyRowTable(ti,tableName);
-                                               dbi.createSQLTriggers(tableName);
-                                               table_set.add(tableName);
-                                               synchronizeTableData(tableName);
-                                               logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" +
-                                                                       table_set.size() + "/" + set1.size() + "tables uploaded");
-                                       } catch (Exception e) {
-                                               logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
-                                               //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e);
-                                               throw new QueryException();
-                                       }
-                               }
-                       }
+    /**
+     * Close this MusicSqlManager.
+     */
+    public void close() {
+        if (dbi != null) {
+            dbi.close();
+        }
+    }
+
+    /**
+     * Code to be run within the DB driver before a SQL statement is executed.  This is where tables
+     * can be synchronized before a SELECT, for those databases that do not support SELECT triggers.
+     *
+     * @param sql the SQL statement that is about to be executed
+     */
+    public void preStatementHook(final String sql) {
+        dbi.preStatementHook(sql);
+    }
+
+    /**
+     * Code to be run within the DB driver after a SQL statement has been executed.  This is where remote
+     * statement actions can be copied back to Cassandra/MUSIC.
+     *
+     * @param sql the SQL statement that was executed
+     */
+    public void postStatementHook(final String sql) {
+        dbi.postStatementHook(sql, transactionDigest);
+    }
+
+    /**
+     * Synchronize the list of tables in SQL with the list in MUSIC. This function should be called when the
+     * proxy first starts, and whenever there is the possibility that tables were created or dropped.  It is synchronized
+     * in order to prevent multiple threads from running this code in parallel.
+     */
+    public synchronized void synchronizeTables() throws QueryException {
+        Set<String> set1 = dbi.getSQLTableSet();    // set of tables in the database
+        logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1);
+        for (String tableName : set1) {
+            // This map will be filled in if this table was previously discovered
+            if (!table_set.contains(tableName) && !dbi.getReservedTblNames().contains(tableName)) {
+                logger.info(EELFLoggerDelegate.applicationLogger, "New table discovered: " + tableName);
+                try {
+                    TableInfo ti = dbi.getTableInfo(tableName);
+                    mi.initializeMusicForTable(ti, tableName);
+                    //\TODO Verify if table info can be modify in the previous step, if not this step can be deleted
+                    ti = dbi.getTableInfo(tableName);
+                    mi.createDirtyRowTable(ti, tableName);
+                    dbi.createSQLTriggers(tableName);
+                    table_set.add(tableName);
+                    synchronizeTableData(tableName);
+                    logger.debug(EELFLoggerDelegate.applicationLogger, "synchronized tables:" +
+                            table_set.size() + "/" + set1.size() + "tables uploaded");
+                } catch (Exception e) {
+                    logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.QUERYERROR);
+                    //logger.error(EELFLoggerDelegate.errorLogger, "Exception synchronizeTables: "+e);
+                    throw new QueryException();
+                }
+            }
+        }
 
 //                     Set<String> set2 = getMusicTableSet(music_ns);
-                       // not working - fix later
+        // not working - fix later
 //                     for (String tbl : set2) {
 //                             if (!set1.contains(tbl)) {
 //                                     logger.debug("Old table dropped: "+tbl);
@@ -159,101 +157,107 @@ public class MusicSqlManager {
 //                                     // ZZTODO drop camunda table ?
 //                             }
 //                     }
-       }
+    }
+
+    /**
+     * On startup, copy dirty data from Cassandra to H2. May not be needed.
+     *
+     * @param tableName
+     */
+    public void synchronizeTableData(String tableName) {
+        // TODO - copy MUSIC -> H2
+        dbi.synchronizeData(tableName);
+    }
+
+    /**
+     * This method is called whenever there is a SELECT on a local SQL table, and should be called by the underlying databases
+     * triggering mechanism.  It first checks the local dirty bits table to see if there are any keys in Cassandra whose value
+     * has not yet been sent to SQL.  If there are, the appropriate values are copied from Cassandra to the local database.
+     * Under normal execution, this function behaves as a NOP operation.
+     *
+     * @param tableName This is the table on which the SELECT is being performed
+     */
+    public void readDirtyRowsAndUpdateDb(String tableName) {
+        mi.readDirtyRowsAndUpdateDb(dbi, tableName);
+    }
+
+
+    /**
+     * This method gets the primary key that the music interfaces uses by default.
+     * If the front end uses a primary key, this will not match what is used in the MUSIC interface
+     *
+     * @return
+     */
+    public String getMusicDefaultPrimaryKeyName() {
+        return mi.getMusicDefaultPrimaryKeyName();
+    }
+
+    /**
+     * Asks music interface to provide the function to create a primary key
+     * e.g. uuid(), 1, "unique_aksd419fjc"
+     *
+     * @return
+     */
+    public String generateUniqueKey() {
+        //
+        return mi.generateUniqueKey();
+    }
 
-       /**
-        * On startup, copy dirty data from Cassandra to H2. May not be needed.
-        * @param tableName 
-        */
-       public void synchronizeTableData(String tableName) {
-               // TODO - copy MUSIC -> H2
-               dbi.synchronizeData(tableName);
-       }
-       /**
-        * This method is called whenever there is a SELECT on a local SQL table, and should be called by the underlying databases
-        * triggering mechanism.  It first checks the local dirty bits table to see if there are any keys in Cassandra whose value
-        * has not yet been sent to SQL.  If there are, the appropriate values are copied from Cassandra to the local database.
-        * Under normal execution, this function behaves as a NOP operation.
-        * @param tableName This is the table on which the SELECT is being performed
-        */
-       public void readDirtyRowsAndUpdateDb(String tableName) {
-               mi.readDirtyRowsAndUpdateDb(dbi,tableName);
-       }
-       
-       
-       
-       
-       /**
-        * This method gets the primary key that the music interfaces uses by default.
-        * If the front end uses a primary key, this will not match what is used in the MUSIC interface
-        * @return
-        */
-       public String getMusicDefaultPrimaryKeyName() {
-               return mi.getMusicDefaultPrimaryKeyName();
-       }
-       
-       /**
-        * Asks music interface to provide the function to create a primary key
-        * e.g. uuid(), 1, "unique_aksd419fjc"
-        * @return
-        */
-       public String generateUniqueKey() {
-               // 
-               return mi.generateUniqueKey();
-       }
-       
-       
-       /**
-        * Perform a commit, as requested by the JDBC driver.  If any row updates have been delayed,
-        * they are performed now and copied into MUSIC.
-        * @throws MDBCServiceException 
-        */
-       public synchronized void commit(String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException {
-               logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
-               // transaction was committed -- add all the updates into the REDO-Log in MUSIC
-               try {
-                       mi.commitLog(dbi, partition, transactionDigest, txId, progressKeeper);
-               }catch(MDBCServiceException e) {
-                       logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
-                       throw e;
-               }
-       }
 
-       /**
-        * Perform a rollback, as requested by the JDBC driver.  If any row updates have been delayed,
-        * they are discarded.
-        */
-       public synchronized void rollback() {
-               // transaction was rolled back - discard the updates
-               logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");;
-               transactionDigest.clear();
-       }
+    /**
+     * Perform a commit, as requested by the JDBC driver.  If any row updates have been delayed,
+     * they are performed now and copied into MUSIC.
+     *
+     * @throws MDBCServiceException
+     */
+    public synchronized void commit(String txId, TxCommitProgress progressKeeper, DatabasePartition partition) throws MDBCServiceException {
+        logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
+        // transaction was committed -- add all the updates into the REDO-Log in MUSIC
+        try {
+            mi.commitLog(dbi, partition, transactionDigest, txId, progressKeeper);
+        } catch (MDBCServiceException e) {
+            logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(), AppMessages.QUERYERROR, ErrorTypes.QUERYERROR, ErrorSeverity.CRITICAL);
+            throw e;
+        }
+    }
 
-       /**
-        * Get all 
-        * @param table
-        * @param dbRow
-        * @return
-        */
-       public String getMusicKeyFromRowWithoutPrimaryIndexes(String table, JSONObject dbRow) {
-               TableInfo ti = dbi.getTableInfo(table);
-               return mi.getMusicKeyFromRowWithoutPrimaryIndexes(ti,table, dbRow);
-       }
-       
-       public String getMusicKeyFromRow(String table, JSONObject dbRow) {
-               TableInfo ti = dbi.getTableInfo(table);
-               return mi.getMusicKeyFromRow(ti,table, dbRow);
-       }
-       
-       /**
-        * Returns all keys that matches the current sql statement, and not in already updated keys.
-        * 
-        * @param sql the query that we are getting keys for
-        * @deprecated
-        */
-       public ArrayList<String> getMusicKeys(String sql) {
-               ArrayList<String> musicKeys = new ArrayList<String>();
-               //\TODO See if this is required
+    /**
+     * Perform a rollback, as requested by the JDBC driver.  If any row updates have been delayed,
+     * they are discarded.
+     */
+    public synchronized void rollback() {
+        // transaction was rolled back - discard the updates
+        logger.debug(EELFLoggerDelegate.applicationLogger, "Rollback");
+        ;
+        transactionDigest.clear();
+    }
+
+    /**
+     * Get all
+     *
+     * @param table
+     * @param dbRow
+     * @return
+     */
+    public String getMusicKeyFromRowWithoutPrimaryIndexes(String table, JSONObject dbRow) {
+        TableInfo ti = dbi.getTableInfo(table);
+        return mi.getMusicKeyFromRowWithoutPrimaryIndexes(ti, table, dbRow);
+    }
+
+    public String getMusicKeyFromRow(String table, JSONObject dbRow) {
+        TableInfo ti = dbi.getTableInfo(table);
+        return mi.getMusicKeyFromRow(ti, table, dbRow);
+    }
+
+    /**
+     * Returns all keys that matches the current sql statement, and not in already updated keys.
+     *
+     * @param sql the query that we are getting keys for
+     * @deprecated
+     */
+    public ArrayList<String> getMusicKeys(String sql) {
+        ArrayList<String> musicKeys = new ArrayList<String>();
+        //\TODO See if this is required
                /*
                try {
                        net.sf.jsqlparser.statement.Statement stmt = CCJSqlParserUtil.parse(sql);
@@ -295,6 +299,18 @@ public class MusicSqlManager {
                        System.out.print(musicKey + ",");
                }
                */
-               return musicKeys;
-       }
+        return musicKeys;
+    }
+
+    public void own(List<Range> ranges) {
+        throw new java.lang.UnsupportedOperationException("function not implemented yet");
+    }
+
+    public void appendRange(String rangeId, List<Range> ranges) {
+        throw new java.lang.UnsupportedOperationException("function not implemented yet");
+    }
+
+    public void relinquish(String ownerId, String rangeId) {
+        throw new java.lang.UnsupportedOperationException("function not implemented yet");
+    }
 }
index accd13a..dc243fb 100644 (file)
@@ -8,7 +8,7 @@ import com.att.research.logging.format.ErrorTypes;
 import com.att.research.mdbc.mixins.MixinFactory;
 import com.att.research.mdbc.mixins.MusicInterface;
 import com.att.research.mdbc.mixins.MusicMixin;
-import com.att.research.mdbc.mixins.TxCommitProgress;
+import com.att.research.mdbc.tables.TxCommitProgress;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -61,10 +61,14 @@ public class StateManager {
        //\fixme this is not really used, delete!
         String cassandraUrl  = info.getProperty(Configuration.KEY_CASSANDRA_URL, Configuration.CASSANDRA_URL_DEFAULT);
         String mixin  = info.getProperty(Configuration.KEY_MUSIC_MIXIN_NAME, Configuration.MUSIC_MIXIN_DEFAULT);
+        init(mixin,cassandraUrl);
+    }
+
+    protected void init(String mixin, String cassandraUrl) throws MDBCServiceException {
         this.musicManager = MixinFactory.createMusicInterface(mixin, cassandraUrl, info,ranges);
         this.musicManager.createKeyspace();
         try {
-            this.musicManager.initializeMdbcDataStructures();
+            this.musicManager.initializeMetricDataStructures();
         } catch (MDBCServiceException e) {
             logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL, ErrorTypes.GENERALSERVICEERROR);
             throw(e);
index 78850e3..d74dafb 100644 (file)
@@ -22,8 +22,8 @@ public class NodeConfiguration {
     public DatabasePartition partition;
     public String nodeName;
 
-    public NodeConfiguration(String tables, String titIndex, String titTableName, String partitionId, String sqlDatabaseName, String node, String redoRecordsTable){
-        partition = new DatabasePartition(toRanges(tables), titIndex, titTableName, partitionId, null, redoRecordsTable) ;
+    public NodeConfiguration(String tables, String mriIndex, String mriTableName, String partitionId, String sqlDatabaseName, String node, String redoRecordsTable){
+        partition = new DatabasePartition(toRanges(tables), mriIndex,  mriTableName, partitionId, null, redoRecordsTable) ;
         this.sqlDatabaseName = sqlDatabaseName;
         this.nodeName = node;
     }
index 0d28b51..eeb15a5 100644 (file)
@@ -19,7 +19,7 @@ import java.util.List;
 public class TablesConfiguration {
 
     private final String TIT_TABLE_NAME = "transactioninformation";
-    private final String REDO_RECORDS_NAME = "redorecords";
+    private final String MUSIC_TX_DIGEST_TABLE_NAME = "musictxdigest";
 
     private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TablesConfiguration.class);
     private List<PartitionInformation> partitions;
@@ -54,13 +54,13 @@ public class TablesConfiguration {
             throw new MDBCServiceException("Partition was not correctly initialized");
         }
         for(PartitionInformation partitionInfo : partitions){
-            String titTableName = partitionInfo.titTableName;
+            String titTableName = partitionInfo.mriTableName;
             titTableName = (titTableName==null || titTableName.isEmpty())?TIT_TABLE_NAME:titTableName;
             //0) Create the corresponding TIT table
-            DatabaseOperations.CreateTransactionInformationTable(musicNamespace,titTableName);
-            String redoRecordsName = partitionInfo.rrtTableName;
-            redoRecordsName = (redoRecordsName==null || redoRecordsName.isEmpty())?REDO_RECORDS_NAME:redoRecordsName;
-            DatabaseOperations.CreateRedoRecordsTable(-1,musicNamespace,redoRecordsName);
+            DatabaseOperations.CreateMusicRangeInformationTable(musicNamespace,titTableName);
+            String musicTxDigestTableName = partitionInfo.mtxdTableName;
+            musicTxDigestTableName = (musicTxDigestTableName==null || musicTxDigestTableName.isEmpty())? MUSIC_TX_DIGEST_TABLE_NAME :musicTxDigestTableName;
+            DatabaseOperations.CreateMusicTxDigest(-1,musicNamespace,musicTxDigestTableName);
             //0) Create the corresponding TIT table
             String partitionId;
             if(partitionInfo.partitionId==null || partitionInfo.partitionId.isEmpty()){
@@ -87,7 +87,7 @@ public class TablesConfiguration {
             //5) Add it to the redo history table
             DatabaseOperations.createRedoHistoryBeginRow(musicNamespace,rhName,newRedoRow,partitionId,null);
             //6) Create config for this node
-            nodeConfigs.add(new NodeConfiguration(String.join(",",partitionInfo.tables),titIndex,titTableName,partitionId,sqlDatabaseName,partitionInfo.owner,redoRecordsName));
+            nodeConfigs.add(new NodeConfiguration(String.join(",",partitionInfo.tables),titIndex,titTableName,partitionId,sqlDatabaseName,partitionInfo.owner,musicTxDigestTableName));
         }
         return nodeConfigs;
     }
@@ -124,8 +124,8 @@ public class TablesConfiguration {
     public class PartitionInformation{
         private List<String> tables;
         private String owner;
-        private String titTableName;
-        private String rrtTableName;
+        private String mriTableName;
+        private String mtxdTableName;
         private String partitionId;
         private int replicationFactor;
 
@@ -145,12 +145,12 @@ public class TablesConfiguration {
             this.owner = owner;
         }
 
-        public String getTitTableName() {
-            return titTableName;
+        public String getMriTableName() {
+            return mriTableName;
         }
 
-        public void setTitTableName(String titTableName) {
-            this.titTableName = titTableName;
+        public void setMriTableName(String mriTableName) {
+            this.mriTableName = mriTableName;
         }
 
         public String getPartitionId() {
@@ -169,12 +169,12 @@ public class TablesConfiguration {
             this.replicationFactor = replicationFactor;
         }
 
-        public String getRrtTableName(){
-           return rrtTableName;
+        public String getMtxdTableName(){
+           return mtxdTableName;
         }
 
-        public void setRrtTableName(String rrtTableName) {
-            this.rrtTableName = rrtTableName;
+        public void setMtxdTableName(String mtxdTableName) {
+            this.mtxdTableName = mtxdTableName;
         }
     }
 }
index 96d947c..2207a52 100644 (file)
@@ -1,9 +1,9 @@
 {
   "sqlDatabaseName": "test",
   "partition": {
-    "transactionInformationTable": "transactioninformation",
-    "transactionInformationIndex": "259a7a7c-f741-44ae-8d6e-227a02ddc96e",
-    "redoRecordsTable": "redorecords",
+    "musicRangeInformationTable": "transactioninformation",
+    "musicRangeInformationIndex": "259a7a7c-f741-44ae-8d6e-227a02ddc96e",
+    "musicTxDigestTable": "musictxdigest",
     "partitionId": "ad766447-1adf-4800-aade-9f31a356ab4b",
     "lockId": "",
     "ranges": [
index afa343b..2a792e8 100644 (file)
@@ -1,6 +1,6 @@
 {
-  "transactionInformationTable": "transactioninformation",
-  "transactionInformationIndex": "d0e8ef2e-aeca-4261-8d9d-1679f560b85b",
+  "musicRangeInformationTable": "transactioninformation",
+  "musicRangeInformationIndex": "d0e8ef2e-aeca-4261-8d9d-1679f560b85b",
   "partitionId": "798110cf-9c61-4db2-9446-cb2dbab5a143",
   "lockId": "",
   "ranges": [
index b3c6224..e67dd0b 100644 (file)
@@ -3,8 +3,8 @@
     {
       "tables":["table11"],
       "owner":"",
-      "titTableName":"transactioninformation",
-      "rrtTableName":"redorecords",
+      "mriTableName":"musicrangeinformation",
+      "mtxdTableName":"musictxdigest",
       "partitionId":"",
       "replicationFactor":1
     }
index 6684fe6..28090b7 100755 (executable)
@@ -16,6 +16,15 @@ import java.util.TreeSet;
 import java.util.UUID;
 
 import com.att.research.mdbc.*;
+import com.att.research.mdbc.tables.PartitionInformation;
+import com.att.research.mdbc.tables.RedoHistoryElement;
+import com.att.research.mdbc.tables.RedoRecordId;
+import com.att.research.mdbc.tables.StagingTable;
+import com.att.research.mdbc.tables.TablePartitionInformation;
+import com.att.research.mdbc.tables.TitReference;
+import com.att.research.mdbc.tables.TransactionInformationElement;
+import com.att.research.mdbc.tables.TxCommitProgress;
+
 import org.json.JSONObject;
 import org.onap.music.datastore.CassaLockStore;
 import org.onap.music.datastore.PreparedQueryObject;
@@ -84,8 +93,8 @@ public class CassandraMixin implements MusicInterface {
        public static final String PARTITION_INFORMATION_TABLE_NAME = "partitioninfo";
        public static final String REDO_HISTORY_TABLE_NAME= "redohistory";
        //\TODO Add logic to change the names when required and create the tables when necessary
-    private String redoRecordTableName = "redorecords";
-       private String transactionInformationTableName = "transactioninformation";
+    private String musicTxDigestTableName = "musictxdigest";
+       private String musicRangeInformationTableName = "musicrangeinformation";
 
        private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(CassandraMixin.class);
        
@@ -152,7 +161,7 @@ public class CassandraMixin implements MusicInterface {
 
                this.music_ns       = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE);
                logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns);
-        transactionInformationTableName = "transactioninformation";
+        musicRangeInformationTableName = "transactioninformation";
         createMusicKeyspace();
     }
 
@@ -211,10 +220,10 @@ public class CassandraMixin implements MusicInterface {
                }
        }
        @Override
-       public void initializeMdbcDataStructures() throws MDBCServiceException {
+       public void initializeMetricDataStructures() throws MDBCServiceException {
            try {
-                       DatabaseOperations.CreateRedoRecordsTable(-1, music_ns, redoRecordTableName);//\TODO If we start partitioning the data base, we would need to use the redotable number
-                       DatabaseOperations.CreateTransactionInformationTable(music_ns, transactionInformationTableName);
+                       DatabaseOperations.CreateMusicTxDigest(-1, music_ns, musicTxDigestTableName);//\TODO If we start partitioning the data base, we would need to use the redotable number
+                       DatabaseOperations.CreateMusicRangeInformationTable(music_ns, musicRangeInformationTableName);
                        DatabaseOperations.CreateTableToPartitionTable(music_ns, TABLE_TO_PARTITION_TABLE_NAME);
                        DatabaseOperations.CreatePartitionInfoTable(music_ns, PARTITION_INFORMATION_TABLE_NAME);
                        DatabaseOperations.CreateRedoHistoryTable(music_ns, REDO_HISTORY_TABLE_NAME);
@@ -1016,7 +1025,7 @@ public class CassandraMixin implements MusicInterface {
        }
 
 
-       private PreparedQueryObject createAppendRRTIndexToTitQuery(String titTable, String uuid, String table, String redoUuid){
+       private PreparedQueryObject createAppendMtxdIndexToMriQuery(String titTable, String uuid, String table, String redoUuid){
         PreparedQueryObject query = new PreparedQueryObject();
         StringBuilder appendBuilder = new StringBuilder();
         appendBuilder.append("UPDATE ")
@@ -1088,12 +1097,12 @@ public class CassandraMixin implements MusicInterface {
         return lockId;
     }
 
-    protected void pushRowToRRT(String lockId, String commitId, HashMap<Range,StagingTable> transactionDigest) throws MDBCServiceException{
+    protected void pushRowToMtxd(String lockId, String commitId, HashMap<Range,StagingTable> transactionDigest) throws MDBCServiceException{
                PreparedQueryObject query = new PreparedQueryObject();
            StringBuilder cqlQuery = new StringBuilder("INSERT INTO ")
                   .append(music_ns)
                   .append('.')
-                     .append(redoRecordTableName)
+                     .append(musicTxDigestTableName)
                      .append(" (leaseid,leasecounter,transactiondigest) ")
                      .append("VALUES ('")
                      .append( lockId ).append("',")
@@ -1115,15 +1124,15 @@ public class CassandraMixin implements MusicInterface {
         }
     }
 
-    protected void appendIndexToTit(String lockId, String commitId, String TITIndex) throws MDBCServiceException{
+    protected void appendIndexToMri(String lockId, String commitId, String TITIndex) throws MDBCServiceException{
         StringBuilder redoUuidBuilder  = new StringBuilder();
         redoUuidBuilder.append("('")
                 .append(lockId)
                 .append("',")
                 .append(commitId)
                 .append(")");
-        PreparedQueryObject appendQuery = createAppendRRTIndexToTitQuery(transactionInformationTableName, TITIndex, redoRecordTableName, redoUuidBuilder.toString());
-        ReturnType returnType = MusicPureCassaCore.criticalPut(music_ns, transactionInformationTableName, TITIndex, appendQuery, lockId, null);
+        PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, TITIndex, musicTxDigestTableName, redoUuidBuilder.toString());
+        ReturnType returnType = MusicPureCassaCore.criticalPut(music_ns, musicRangeInformationTableName, TITIndex, appendQuery, lockId, null);
         if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
             logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
             throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
@@ -1132,16 +1141,16 @@ public class CassandraMixin implements MusicInterface {
 
        @Override
        public void commitLog(DBInterface dbi, DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
-               String TITIndex = partition.getTransactionInformationIndex();
+               String TITIndex = partition.getMusicRangeInformationIndex();
                if(TITIndex.isEmpty()) {
                        //\TODO Fetch TITIndex from the Range Information Table 
                        throw new MDBCServiceException("TIT Index retrieval not yet implemented");
                }
-        String fullyQualifiedTitKey = music_ns+"."+ transactionInformationTableName +"."+TITIndex;
+        String fullyQualifiedTitKey = music_ns+"."+ musicRangeInformationTableName +"."+TITIndex;
                //0. See if reference to lock was already created
                String lockId = partition.getLockId();
                if(lockId == null || lockId.isEmpty()) {
-            lockId = createAndAssignLock(fullyQualifiedTitKey,partition,music_ns,transactionInformationTableName,TITIndex);
+            lockId = createAndAssignLock(fullyQualifiedTitKey,partition,music_ns, musicRangeInformationTableName,TITIndex);
                }
 
                String commitId;
@@ -1156,14 +1165,14 @@ public class CassandraMixin implements MusicInterface {
         //Add creation type of transaction digest
 
                //1. Push new row to RRT and obtain its index
-        pushRowToRRT(lockId, commitId, transactionDigest);
+        pushRowToMtxd(lockId, commitId, transactionDigest);
 
         //2. Save RRT index to RQ
                if(progressKeeper!= null) {
                        progressKeeper.setRecordId(txId,new RedoRecordId(lockId, commitId));
                }
                //3. Append RRT index into the corresponding TIT row array
-        appendIndexToTit(lockId,commitId,TITIndex);
+        appendIndexToMri(lockId,commitId,TITIndex);
     }
 
     /**
index 9aa94f9..e2b2ad7 100755 (executable)
@@ -7,6 +7,7 @@ import java.util.Set;
 
 import com.att.research.mdbc.Range;
 import com.att.research.mdbc.TableInfo;
+import com.att.research.mdbc.tables.StagingTable;
 
 /**
  * This Interface defines the methods that MDBC needs in order to mirror data to/from a Database instance.
index 94b3ac6..6e2e0ca 100755 (executable)
@@ -10,6 +10,15 @@ import com.att.research.exceptions.MDBCServiceException;
 import com.att.research.mdbc.DatabasePartition;
 import com.att.research.mdbc.Range;
 import com.att.research.mdbc.TableInfo;
+import com.att.research.mdbc.tables.PartitionInformation;
+import com.att.research.mdbc.tables.RedoHistoryElement;
+import com.att.research.mdbc.tables.RedoRecordId;
+import com.att.research.mdbc.tables.StagingTable;
+import com.att.research.mdbc.tables.TablePartitionInformation;
+import com.att.research.mdbc.tables.TitReference;
+import com.att.research.mdbc.tables.TransactionInformationElement;
+import com.att.research.mdbc.tables.TxCommitProgress;
+
 import org.onap.music.exceptions.MusicLockingException;
 
 /**
@@ -22,7 +31,7 @@ public interface MusicInterface {
         * This function is used to created all the required data structures, both local  
         * \TODO Check if this function is required in the MUSIC interface or could be just created on the constructor
         */
-       void initializeMdbcDataStructures() throws MDBCServiceException;
+       void initializeMetricDataStructures() throws MDBCServiceException;
        /**
         * Get the name of this MusicInterface mixin object.
         * @return the name
index 1fee59c..60adaf1 100644 (file)
@@ -18,6 +18,15 @@ import com.att.research.exceptions.MDBCServiceException;
 import com.att.research.mdbc.DatabasePartition;
 import com.att.research.mdbc.Range;
 import com.att.research.mdbc.TableInfo;
+import com.att.research.mdbc.tables.PartitionInformation;
+import com.att.research.mdbc.tables.RedoHistoryElement;
+import com.att.research.mdbc.tables.RedoRecordId;
+import com.att.research.mdbc.tables.StagingTable;
+import com.att.research.mdbc.tables.TablePartitionInformation;
+import com.att.research.mdbc.tables.TitReference;
+import com.att.research.mdbc.tables.TransactionInformationElement;
+import com.att.research.mdbc.tables.TxCommitProgress;
+
 import org.onap.music.main.MusicPureCassaCore;
 
 /**
@@ -172,7 +181,7 @@ public class MusicMixin implements MusicInterface {
        }
 
        @Override
-       public void initializeMdbcDataStructures() {
+       public void initializeMetricDataStructures() {
                // 
                
        }
index a836a39..4f70147 100755 (executable)
@@ -21,6 +21,8 @@ import com.att.research.logging.EELFLoggerDelegate;
 import com.att.research.mdbc.MusicSqlManager;
 import com.att.research.mdbc.Range;
 import com.att.research.mdbc.TableInfo;
+import com.att.research.mdbc.tables.OperationType;
+import com.att.research.mdbc.tables.StagingTable;
 
 import net.sf.jsqlparser.JSQLParserException;
 import net.sf.jsqlparser.parser.CCJSqlParserUtil;
diff --git a/src/main/java/com/att/research/mdbc/tables/Operation.java b/src/main/java/com/att/research/mdbc/tables/Operation.java
new file mode 100644 (file)
index 0000000..026fa40
--- /dev/null
@@ -0,0 +1,35 @@
+package com.att.research.mdbc.tables;
+
+import java.io.Serializable;
+
+import org.json.JSONObject;
+import org.json.JSONTokener;
+
+public final class Operation implements Serializable{
+
+       private static final long serialVersionUID = -1215301985078183104L;
+
+       final OperationType TYPE;
+       final String OLD_VAL;
+       final String NEW_VAL;
+
+       public Operation(OperationType type, String newVal, String oldVal) {
+               TYPE = type;
+               NEW_VAL = newVal;
+               OLD_VAL = oldVal;
+       }
+
+       public JSONObject getNewVal(){
+        JSONObject newRow  = new JSONObject(new JSONTokener(NEW_VAL));
+        return newRow;
+    }
+
+    public JSONObject getOldVal(){
+        JSONObject keydata = new JSONObject(new JSONTokener(OLD_VAL));
+        return keydata;
+    }
+    
+    public OperationType getOperationType() {
+       return this.TYPE;
+    }
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/OperationType.java b/src/main/java/com/att/research/mdbc/tables/OperationType.java
new file mode 100644 (file)
index 0000000..ae83485
--- /dev/null
@@ -0,0 +1,5 @@
+package com.att.research.mdbc.tables;
+
+public enum OperationType{
+       DELETE, UPDATE, INSERT, SELECT
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/PartitionInformation.java b/src/main/java/com/att/research/mdbc/tables/PartitionInformation.java
new file mode 100644 (file)
index 0000000..9249844
--- /dev/null
@@ -0,0 +1,19 @@
+package com.att.research.mdbc.tables;
+
+import java.util.List;
+
+public class PartitionInformation {
+       public final String partition;
+       public final TitReference tit; 
+       public final List<String> tables;
+       public final int replicationFactor;
+       public final String currentOwner;
+       
+       public PartitionInformation(String partition, TitReference tit, List<String> tables, int replicationFactor, String currentOwner) {
+               this.partition=partition;
+               this.tit=tit;
+               this.tables=tables;
+               this.replicationFactor=replicationFactor;
+               this.currentOwner=currentOwner;
+       }
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/RedoHistoryElement.java b/src/main/java/com/att/research/mdbc/tables/RedoHistoryElement.java
new file mode 100644 (file)
index 0000000..8d92216
--- /dev/null
@@ -0,0 +1,15 @@
+package com.att.research.mdbc.tables;
+
+import java.util.List;
+
+public final class RedoHistoryElement {
+       public final String partition;
+       public final TitReference current;
+       public final List<TitReference> previous;
+       
+       public RedoHistoryElement(String partition, TitReference current, List<TitReference> previous) {
+               this.partition = partition;
+               this.current = current;
+               this.previous = previous;
+       }
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/RedoRecordId.java b/src/main/java/com/att/research/mdbc/tables/RedoRecordId.java
new file mode 100644 (file)
index 0000000..225c89e
--- /dev/null
@@ -0,0 +1,15 @@
+package com.att.research.mdbc.tables;
+
+public final class RedoRecordId {
+       public final String leaseId;
+       public final String commitId;
+
+       public RedoRecordId(String leaseId, String commitId) {
+               this.leaseId = leaseId;
+               this.commitId = commitId;
+       }
+
+       public boolean isEmpty() {
+               return (this.leaseId==null || this.leaseId.isEmpty())&&(this.commitId==null||this.commitId.isEmpty());
+       }
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/StagingTable.java b/src/main/java/com/att/research/mdbc/tables/StagingTable.java
new file mode 100644 (file)
index 0000000..c16f11c
--- /dev/null
@@ -0,0 +1,51 @@
+package com.att.research.mdbc.tables;
+
+import java.io.Serializable;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
+import org.json.JSONObject;
+
+import com.att.research.logging.EELFLoggerDelegate;
+
+public class StagingTable implements Serializable{
+       /**
+        * 
+        */
+       private static final long serialVersionUID = 7583182634761771943L;
+       private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(StagingTable.class);
+       //primary key -> Operation
+       private HashMap<String,Deque<Operation>> operations;
+       
+       public StagingTable() {
+               operations = new HashMap<>();
+       }
+       
+       synchronized public void addOperation(String key, OperationType type, String oldVal, String newVal) {
+               if(!operations.containsKey(key)) {
+                       operations.put(key, new LinkedList<>());
+               }
+               operations.get(key).add(new Operation(type,newVal,oldVal));
+       }
+       
+       synchronized public Deque<Pair<String,Operation>> getIterableSnapshot() throws NoSuchFieldException{
+               Deque<Pair<String,Operation>> response=new LinkedList<Pair<String,Operation>>();
+               //\TODO: check if we can just return the last change to a given key 
+               Set<String> keys = operations.keySet();
+               for(String key : keys) {
+                       Deque<Operation> ops = operations.get(key);
+                       if(ops.isEmpty()) {
+                               logger.error(EELFLoggerDelegate.errorLogger, "Invalid state of the Operation data structure when creating snapshot");
+                               throw new NoSuchFieldException("Invalid state of the operation data structure");
+                       }
+                       response.add(Pair.of(key,ops.getLast()));
+               }
+               return response;
+       }
+       
+       synchronized public void clean() {
+               operations.clear();
+       }
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/TablePartitionInformation.java b/src/main/java/com/att/research/mdbc/tables/TablePartitionInformation.java
new file mode 100644 (file)
index 0000000..9201de5
--- /dev/null
@@ -0,0 +1,15 @@
+package com.att.research.mdbc.tables;
+
+import java.util.List;
+
+public final class TablePartitionInformation {
+       public final String table;
+       public final String partition;
+       public final List<String> oldPartitions;
+
+       public TablePartitionInformation(String table, String partition, List<String> oldPartitions) {
+               this.table = table;
+               this.partition = partition;
+               this.oldPartitions = oldPartitions;
+       }
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/TitReference.java b/src/main/java/com/att/research/mdbc/tables/TitReference.java
new file mode 100644 (file)
index 0000000..2abb989
--- /dev/null
@@ -0,0 +1,12 @@
+package com.att.research.mdbc.tables;
+
+public final class TitReference {
+       public final String table;
+       public final String index;
+
+       public TitReference(String table, String index) {
+               this.table = table;
+               this.index=  index;
+       }
+
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/TransactionInformationElement.java b/src/main/java/com/att/research/mdbc/tables/TransactionInformationElement.java
new file mode 100644 (file)
index 0000000..a80cab1
--- /dev/null
@@ -0,0 +1,19 @@
+package com.att.research.mdbc.tables;
+
+import java.util.List;
+
+public final class TransactionInformationElement {
+       public final String index;
+       public final List<RedoRecordId> redoLog;
+       public final String partition;
+       public final int latestApplied;
+       public final boolean applied;
+       
+       public TransactionInformationElement(String index, List<RedoRecordId> redoLog, String partition, int latestApplied, boolean applied) {
+               this.index = index;
+               this.redoLog = redoLog;
+               this.partition = partition;
+               this.latestApplied = latestApplied;
+               this.applied = applied;
+       }
+}
diff --git a/src/main/java/com/att/research/mdbc/tables/TxCommitProgress.java b/src/main/java/com/att/research/mdbc/tables/TxCommitProgress.java
new file mode 100644 (file)
index 0000000..cecdb08
--- /dev/null
@@ -0,0 +1,206 @@
+package com.att.research.mdbc.tables;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import com.att.research.logging.EELFLoggerDelegate;
+
+import java.sql.Connection;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+public class TxCommitProgress{
+       private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(TxCommitProgress.class);
+
+       private AtomicReference<BigInteger> nextCommitId;
+       private Map<String, CommitProgress> transactionInfo;
+
+       public TxCommitProgress(){
+               nextCommitId=new AtomicReference<>(BigInteger.ZERO);
+               transactionInfo = new ConcurrentHashMap<>();
+       }
+       
+       public boolean containsTx(String txId) {
+               return transactionInfo.containsKey(txId);
+       }
+       
+       public BigInteger getCommitId(String txId) {
+               CommitProgress prog = transactionInfo.get(txId);
+               if(prog.isCommitIdAssigned()) {
+                       return prog.getCommitId();
+               }
+               BigInteger commitId = nextCommitId.getAndUpdate((a)-> a.add(BigInteger.ONE));
+               prog.setCommitId(commitId);
+               return commitId;
+       }
+       
+       public void createNewTransactionTracker(String id, Connection conn) {
+               transactionInfo.put(id, new CommitProgress(id,conn));
+       }
+       
+       public void commitRequested(String txId) {
+               CommitProgress prog = transactionInfo.get(txId);
+               if(prog == null){
+                       logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when storing commit request",txId);
+               }
+               prog.setCommitRequested();
+       }
+       
+       public void setSQLDone(String txId) {
+               CommitProgress prog = transactionInfo.get(txId);
+               if(prog == null){
+                       logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when storing saving completion of SQL",txId);
+               }
+               prog.setSQLCompleted();
+       }
+
+       public void setMusicDone(String txId) {
+               CommitProgress prog = transactionInfo.get(txId);
+               if(prog == null){
+                       logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when storing saving completion of Music",txId);
+               }
+               prog.setMusicCompleted();
+       }
+       
+       public Connection getConnection(String txId){
+               CommitProgress prog = transactionInfo.get(txId);
+               if(prog == null){
+                       logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when retrieving statement",txId);
+               }
+               return prog.getConnection();
+       }
+       
+       public void setRecordId(String txId, RedoRecordId recordId){
+               CommitProgress prog = transactionInfo.get(txId);
+               if(prog == null){
+                       logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when setting record Id",txId);
+               }
+               prog.setRecordId(recordId);
+       }
+       
+       public RedoRecordId getRecordId(String txId) {
+               CommitProgress prog = transactionInfo.get(txId);
+               if(prog == null){
+                       logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when getting record Id",txId);
+               }
+               return prog.getRecordId();
+       }
+       
+       public boolean isRecordIdAssigned(String txId) {
+               CommitProgress prog = transactionInfo.get(txId);
+               if(prog == null){
+                       logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when checking record",txId);
+               }
+               return prog.isRedoRecordAssigned();
+       }
+       
+       public boolean isComplete(String txId) {
+               CommitProgress prog = transactionInfo.get(txId);
+               if(prog == null){
+                       logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when checking completion",txId);
+               }
+               return prog.isComplete();
+       }
+       
+       public void reinitializeTxProgress(String txId) {
+               CommitProgress prog = transactionInfo.get(txId);
+               if(prog == null){
+                       logger.error(EELFLoggerDelegate.errorLogger, "Transaction doesn't exist: [%l], failure when reinitializing tx progress",txId);
+               }
+               prog.reinitialize();
+       }
+
+       public void deleteTxProgress(String txId){
+               transactionInfo.remove(txId);
+       }
+}
+
+final class CommitProgress{
+       private String lTxId; // local transaction id  
+       private BigInteger commitId; // commit id
+       private boolean commitRequested; //indicates if the user tried to commit the request already.
+       private boolean SQLDone; // indicates if SQL was already committed 
+       private boolean MusicDone; // indicates if music commit was already performed, atomic bool
+       private Connection connection;// reference to a connection object. This is used to complete a commit if it failed in the original thread.
+       private Long timestamp; // last time this data structure was updated
+       private RedoRecordId redoRecordId;// record id for each partition
+
+       public CommitProgress(String id,Connection conn){
+               redoRecordId=null;
+               lTxId = id;
+               commitRequested = false;
+               SQLDone = false;
+               MusicDone = false;
+               connection = conn;
+           commitId = null;
+               timestamp = System.currentTimeMillis();
+       }
+       
+       public synchronized boolean isComplete() {
+               return commitRequested && SQLDone && MusicDone;
+       }
+       
+       public synchronized void setCommitId(BigInteger commitId) {
+               this.commitId = commitId;
+               timestamp = System.currentTimeMillis();
+       }
+       
+       public synchronized void reinitialize() {
+               commitId = null;
+               redoRecordId=null;
+               commitRequested = false;
+               SQLDone = false;
+               MusicDone = false;
+               timestamp = System.currentTimeMillis();
+       }
+
+       public synchronized void setCommitRequested() {
+               commitRequested = true;
+               timestamp = System.currentTimeMillis();
+       }
+
+       public synchronized void setSQLCompleted() {
+               SQLDone = true;
+               timestamp = System.currentTimeMillis();
+       }
+       
+       public synchronized void setMusicCompleted() {
+               MusicDone = true;
+               timestamp = System.currentTimeMillis();
+       }
+       
+       public Connection getConnection() {
+               timestamp = System.currentTimeMillis();
+               return connection;
+       } 
+       
+       public long getTimestamInMillis() {
+               return timestamp;
+       }
+
+       public synchronized void setRecordId(RedoRecordId id) {
+               redoRecordId =  id;
+               timestamp = System.currentTimeMillis();
+       }
+       
+       public synchronized boolean isRedoRecordAssigned() {
+               return this.redoRecordId!=null;
+       } 
+
+       public synchronized RedoRecordId getRecordId() {
+               return redoRecordId;
+       } 
+       
+       public synchronized BigInteger getCommitId() {
+               return commitId;
+       }
+       
+       public synchronized String getId() {
+               return this.lTxId;
+       }
+       
+       public synchronized boolean isCommitIdAssigned() {
+               return this.commitId!= null;
+       }
+}
\ No newline at end of file
index 09524cb..83f210d 100644 (file)
@@ -1,17 +1,10 @@
 package com.att.research.mdbc.tools;
 
 import com.att.research.logging.EELFLoggerDelegate;
-import com.att.research.mdbc.DatabasePartition;
-import com.att.research.mdbc.MDBCUtils;
-import com.att.research.mdbc.Range;
 import com.att.research.mdbc.configurations.NodeConfiguration;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
 public class CreatePartition {
     public static final EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(CreatePartition.class);
 
@@ -27,9 +20,9 @@ public class CreatePartition {
     @Parameter(names = { "-n", "--tit-table-name" }, required = true,
             description = "Tit Table name")
     private String titTable;
-    @Parameter(names = { "-r", "--redorecords-table-name" }, required = true,
-            description = "Redo Records Table name")
-    private String rrTable;
+    @Parameter(names = { "-r", "--music-tx-digest-table-name" }, required = true,
+            description = "Music Transaction Digest Table name")
+    private String mTxDTable;
     @Parameter(names = { "-p", "--partition-id" }, required = true,
             description = "Partition Id")
     private String partitionId;
@@ -43,7 +36,7 @@ public class CreatePartition {
     }
 
     public void convert(){
-        config = new NodeConfiguration(tables,titIndex,titTable,partitionId,"test","",rrTable);
+        config = new NodeConfiguration(tables,titIndex,titTable,partitionId,"test","", mTxDTable);
     }
 
     public void saveToFile(){
index cdee078..28af754 100644 (file)
@@ -1,7 +1,8 @@
 package com.att.research.mdbc;
 
-import com.att.research.mdbc.mixins.OperationType;
-import com.att.research.mdbc.mixins.StagingTable;
+import com.att.research.mdbc.tables.OperationType;
+import com.att.research.mdbc.tables.StagingTable;
+
 import org.json.JSONArray;
 import org.json.JSONObject;
 import org.junit.Test;