Add Eventual consistency logic 64/76064/2
authorst782s <statta@research.att.com>
Mon, 21 Jan 2019 12:21:46 +0000 (07:21 -0500)
committerst782s <statta@research.att.com>
Tue, 22 Jan 2019 20:57:08 +0000 (15:57 -0500)
Issue-ID: MUSIC-276
Change-Id: Ie6b2508c57f0a7b677f48f87c991adcd613147cc
Signed-off-by: st782s <statta@research.att.com>
14 files changed:
mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServerLogic.java
mdbc-server/src/main/java/org/onap/music/mdbc/Range.java
mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java [new file with mode: 0644]
mdbc-server/src/main/java/org/onap/music/mdbc/configurations/NodeConfiguration.java
mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java [new file with mode: 0644]
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
mdbc-server/src/test/java/org/onap/music/mdbc/MySQLMixinTest.java
mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java

index 629380d..6c1163a 100755 (executable)
@@ -53,6 +53,7 @@ import org.onap.music.mdbc.ownership.Dag;
 import org.onap.music.mdbc.ownership.DagNode;
 import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
 import org.onap.music.mdbc.query.QueryProcessor;
+import org.onap.music.mdbc.tables.MusicTxDigest;
 import org.onap.music.mdbc.tables.MusicRangeInformationRow;
 import org.onap.music.mdbc.tables.StagingTable;
 import org.onap.music.mdbc.tables.TxCommitProgress;
@@ -69,8 +70,8 @@ import org.onap.music.mdbc.tables.TxCommitProgress;
 public class MdbcConnection implements Connection {
     private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MdbcConnection.class);
 
-    private final String id;                   // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused
-    private final Connection jdbcConn;         // the JDBC Connection to the actual underlying database
+    private final String id;            // This is the transaction id, assigned to this connection. There is no need to change the id, if connection is reused
+    private final Connection jdbcConn;      // the JDBC Connection to the actual underlying database
     private final MusicInterface mi;
     private final TxCommitProgress progressKeeper;
     private final DBInterface dbi;
@@ -80,7 +81,7 @@ public class MdbcConnection implements Connection {
     private DatabasePartition partition;
 
     public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi,
-               TxCommitProgress progressKeeper, DatabasePartition partition, StateManager statemanager) throws MDBCServiceException {
+            TxCommitProgress progressKeeper, DatabasePartition partition, StateManager statemanager) throws MDBCServiceException {
         this.id = id;
         this.table_set = Collections.synchronizedSet(new HashSet<String>());
         this.transactionDigest = new HashMap<Range,StagingTable>();
@@ -161,7 +162,7 @@ public class MdbcConnection implements Connection {
                     throw new SQLException("tx id is null");
                 }
                 try {
-                    mi.commitLog(partition, transactionDigest, id, progressKeeper);
+                    mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper);
                 } catch (MDBCServiceException e) {
                     // TODO Auto-generated catch block
                     logger.error("Cannot commit log to music" + e.getStackTrace());
@@ -203,7 +204,7 @@ public class MdbcConnection implements Connection {
         try {
             logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
             // transaction was committed -- add all the updates into the REDO-Log in MUSIC
-            mi.commitLog(partition, transactionDigest, id, progressKeeper);
+            mi.commitLog(partition, statemanager.getEventualRanges(), transactionDigest, id, progressKeeper);
         } catch (MDBCServiceException e) {
             //If the commit fail, then a new commitId should be used
             logger.error(EELFLoggerDelegate.errorLogger, "Commit to music failed", AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
@@ -226,7 +227,8 @@ public class MdbcConnection implements Connection {
 
         //\TODO try to execute outside of the critical path of commit
         try {
-            relinquishIfRequired(partition);
+            if(partition != null)
+                relinquishIfRequired(partition);
         } catch (MDBCServiceException e) {
             logger.warn("Error trying to relinquish: "+partition.toString());
         }
@@ -489,12 +491,20 @@ public class MdbcConnection implements Connection {
         //Parse tables from the sql query
         Map<String, List<String>> tableToInstruction = QueryProcessor.extractTableFromQuery(sql);
         //Check ownership of keys
-        List<Range> ranges = MDBCUtils.getTables(tableToInstruction);
-        this.partition = own(ranges);
+        List<Range> queryTables = MDBCUtils.getTables(tableToInstruction);
+        // filter out ranges that fall under Eventually consistent
+        // category as these tables do not need ownership
+        List<Range> scQueryTables = filterEveTables( queryTables);
+        this.partition = own(scQueryTables);
         dbi.preStatementHook(sql);
     }
 
 
+    private List<Range> filterEveTables(List<Range> queryTables) {
+        queryTables.removeAll(statemanager.getEventualRanges());
+        return queryTables;
+    }
+
     /**
      * Code to be run within the DB driver after a SQL statement has been executed.  This is where remote
      * statement actions can be copied back to Cassandra/MUSIC.
@@ -510,7 +520,7 @@ public class MdbcConnection implements Connection {
      * in order to prevent multiple threads from running this code in parallel.
      */
     public void synchronizeTables() throws QueryException {
-        Set<String> set1 = dbi.getSQLTableSet();       // set of tables in the database
+        Set<String> set1 = dbi.getSQLTableSet();    // set of tables in the database
         logger.debug(EELFLoggerDelegate.applicationLogger, "synchronizing tables:" + set1);
         for (String tableName : set1) {
             // This map will be filled in if this table was previously discovered
index cb0cea9..9fb36ae 100755 (executable)
@@ -58,6 +58,8 @@ public class MdbcServerLogic extends JdbcMeta{
        public MdbcServerLogic(String Url, Properties info, NodeConfiguration config) throws SQLException, MDBCServiceException {
                super(Url,info);
                this.manager = new StateManager(Url,info,config.nodeName, config.sqlDatabaseName); //FIXME: db name should not be passed in ahead of time
+               manager.setEventualRanges(config.getEventual().getRanges());
+               
                this.info = info;
         int concurrencyLevel = Integer.parseInt(
                 info.getProperty(ConnectionCacheSettings.CONCURRENCY_LEVEL.key(),
@@ -293,6 +295,12 @@ public class MdbcServerLogic extends JdbcMeta{
                }               
        }
 
+
+
+
+
+
+
        @Override
        public void rollback(ConnectionHandle ch) {
                try {
index 214678a..efc89cd 100755 (executable)
@@ -33,10 +33,10 @@ public class Range implements Serializable, Cloneable{
 
        private static final long serialVersionUID = 1610744496930800088L;
 
-       public String table;
+       private String table;
 
        public Range(String table) {
-               this.table = table;
+               this.table = table.toUpperCase();
        }
 
        public String toString(){return table;}
@@ -51,12 +51,12 @@ public class Range implements Serializable, Cloneable{
                if (this == o) return true;
                if (o == null || getClass() != o.getClass()) return false;
                Range r = (Range) o;
-               return (this.overlaps(r)) && (r.overlaps(this));
+        return (this.overlaps(r)) && (r.overlaps(this));
        }
 
        @Override
        public int hashCode(){
-               return table.hashCode();
+        return table.hashCode();
        }
 
        @Override
@@ -76,4 +76,8 @@ public class Range implements Serializable, Cloneable{
                return table.equals(other.table);
        }
 
+    public String getTable() {
+        return table;
+    }
+
 }
index 1359a0d..4c5a3ed 100755 (executable)
@@ -79,7 +79,7 @@ public class StateManager {
     /** Identifier for this server instance */
     private String mdbcServerName;
     private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition
-    
+    private List<Range> eventualRanges;
 
        public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
         this.sqlDBName = sqlDBName;
@@ -150,6 +150,14 @@ public class StateManager {
        }
 
 
+    public List<Range> getEventualRanges() {
+        return eventualRanges;
+    }
+
+    public void setEventualRanges(List<Range> eventualRanges) {
+        this.eventualRanges = eventualRanges;
+    }
+
     public void closeConnection(String connectionId){
         //\TODO check if there is a race condition
         if(mdbcConnections.containsKey(connectionId)) {
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java b/mdbc-server/src/main/java/org/onap/music/mdbc/configurations/Eventual.java
new file mode 100644 (file)
index 0000000..0021bcc
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+package org.onap.music.mdbc.configurations;
+
+import java.util.List;
+import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.Range;
+
+/**
+ * This class represents meta information of tables categorized as eventually consistent
+ */
+public class Eventual {
+    
+    private transient static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(Eventual.class);
+
+    protected List<Range> ranges;
+
+    public Eventual(List<Range> ranges) {
+        super();
+        this.ranges = ranges;
+    }
+
+    public List<Range> getRanges() {
+        return ranges;
+    }
+
+    public void setRanges(List<Range> ranges) {
+        this.ranges = ranges;
+    }
+
+
+
+}
index 7a6aad7..96dc65f 100755 (executable)
@@ -37,12 +37,14 @@ public class NodeConfiguration {
     private static transient final EELFLoggerDelegate LOG = EELFLoggerDelegate.getLogger(NodeConfiguration.class);
 
     public DatabasePartition partition;
+    public Eventual eventual;
     public String nodeName;
     public String sqlDatabaseName;
 
     public NodeConfiguration(String tables, UUID mriIndex, String sqlDatabaseName, String node){
         //     public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) {
         partition = new DatabasePartition(toRanges(tables), mriIndex, null) ;
+        eventual = new Eventual(toRanges(tables));
         this.nodeName = node;
         this.sqlDatabaseName = sqlDatabaseName;
     }
@@ -87,4 +89,12 @@ public class NodeConfiguration {
         NodeConfiguration config = gson.fromJson(br, NodeConfiguration.class);
         return config;
     }
+
+    public Eventual getEventual() {
+        return eventual;
+    }
+
+    public void setEventual(Eventual eventual) {
+        this.eventual = eventual;
+    }
 }
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java b/mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java
new file mode 100644 (file)
index 0000000..338e697
--- /dev/null
@@ -0,0 +1,139 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+package org.onap.music.mdbc.examples;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class MdbcEventualTestClient {
+
+
+
+    public static void main(String[] args){
+        try {
+            Class.forName("org.apache.calcite.avatica.remote.Driver");
+        } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+            System.exit(1);
+        }
+        Connection connection;
+        try {
+            connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf");
+        } catch (SQLException e) {
+            e.printStackTrace();
+            return;
+        }
+
+        try {
+        connection.setAutoCommit(false);
+        } catch (SQLException e) {
+            e.printStackTrace();
+            return;
+        }
+
+
+        final String sql = "CREATE TABLE IF NOT EXISTS audit_log (\n" +
+                "    id int,\n" +
+                "    PersonID int,\n" +
+                "    timeID bigint,\n" +
+                "    PRIMARY KEY (id)" +
+                ");";
+        Statement stmt;
+        try {
+            stmt = connection.createStatement();
+        } catch (SQLException e) {
+            e.printStackTrace();
+            return;
+        }
+
+        boolean execute = true;
+//        try {
+//            execute = stmt.execute(sql);
+//        } catch (SQLException e) {
+//            e.printStackTrace();
+//            return;
+//        }
+
+        if (execute) {
+            try {
+                connection.commit();
+            } catch (SQLException e) {
+                e.printStackTrace();
+            }
+        }
+
+        try {
+            stmt.close();
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+
+        final String insertSQL = "INSERT INTO audit_log VALUES (1, 123, 123456789);";
+        final String insertSQL1 = "DELETE FROM audit_log WHERE PersonID=1;";
+        final String insertSQL2 = "INSERT INTO audit_log VALUES (1, 123, 123456789);";
+        final String insertSQL3 = "UPDATE audit_log SET PersonID=124 where id=1;";
+        final String insertSQL4 = "INSERT INTO audit_log VALUES (2, 234, 123456789);";
+
+
+        Statement insertStmt;
+        try {
+            insertStmt = connection.createStatement();
+        } catch (SQLException e) {
+            e.printStackTrace();
+            return;
+        }
+
+        try {
+            execute = insertStmt.execute(insertSQL);
+            execute = insertStmt.execute(insertSQL1);
+            execute = insertStmt.execute(insertSQL2);
+            execute = insertStmt.execute(insertSQL3);
+            execute = insertStmt.execute(insertSQL4);
+
+        } catch (SQLException e) {
+            e.printStackTrace();
+            return;
+        }
+
+        try {
+            connection.commit();
+        } catch (SQLException e) {
+            e.printStackTrace();
+            return;
+        }
+
+        try {
+            stmt.close();
+            insertStmt.close();
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+
+        try {
+            connection.close();
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+
+
+    }
+}
index 8abfba1..35b6121 100755 (executable)
@@ -7,9 +7,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
+ * 
  *      http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 package org.onap.music.mdbc.mixins;
 
 import com.datastax.driver.core.ResultSet;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-
 import org.json.JSONObject;
-
 import org.onap.music.exceptions.MDBCServiceException;
-import org.onap.music.exceptions.MusicServiceException;
 import org.onap.music.exceptions.MusicLockingException;
+import org.onap.music.exceptions.MusicServiceException;
 import org.onap.music.mdbc.DatabasePartition;
 import org.onap.music.mdbc.Range;
 import org.onap.music.mdbc.TableInfo;
+import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
+import org.onap.music.mdbc.tables.StagingTable;
+import org.onap.music.mdbc.tables.TxCommitProgress;
 import org.onap.music.mdbc.ownership.Dag;
 import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
 import org.onap.music.mdbc.tables.*;
@@ -100,7 +103,7 @@ public interface MusicInterface {
        /**
         * This method creates a keyspace in Music/Cassandra to store the data corresponding to the SQL tables.
         * The keyspace name comes from the initialization properties passed to the JDBC driver.
-        * @throws MusicServiceException
+        * @throws MusicServiceException 
         */
        void createKeyspace() throws MDBCServiceException;
        /**
@@ -167,7 +170,7 @@ public interface MusicInterface {
         * @param changedRow This is information about the row that has changed
         */
        void updateDirtyRowAndEntityTableInMusic(TableInfo ti, String tableName, JSONObject changedRow);
-
+       
        Object[] getObjects(TableInfo ti, String tableName, JSONObject row);
        /**
         * Returns the primary key associated with the given row 
@@ -182,21 +185,22 @@ public interface MusicInterface {
         * Commits the corresponding REDO-log into MUSIC
         *
         * @param partition information related to ownership of partitions, used to verify ownership when commiting the Tx
+        * @param eventualRanges 
         * @param transactionDigest digest of the transaction that is being committed into the Redo log in music. It has to
-        * be a HashMap, because it is required to be serializable
+     * be a HashMap, because it is required to be serializable
         * @param txId id associated with the log being send
         * @param progressKeeper data structure that is used to handle to detect failures, and know what to do
         * @throws MDBCServiceException
         */
-       void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException;
-
+       void commitLog(DatabasePartition partition, List<Range> eventualRanges, HashMap<Range,StagingTable> transactionDigest, String txId,TxCommitProgress progressKeeper) throws MDBCServiceException;
+       
 
-       /**
-        * This function is used to obtain the information related to a specific row in the MRI table
-        * @param partitionIndex index of the row that is going to be retrieved
-        * @return all the information related to the table
-        * @throws MDBCServiceException
-        */
+    /**
+     * This function is used to obtain the information related to a specific row in the MRI table
+     * @param partitionIndex index of the row that is going to be retrieved
+     * @return all the information related to the table
+     * @throws MDBCServiceException
+     */
        MusicRangeInformationRow getMusicRangeInformation(UUID partitionIndex) throws MDBCServiceException;
 
     /**
@@ -208,11 +212,11 @@ public interface MusicInterface {
        RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException;
 
        /**
-        * This function is used to create a new row in the MRI table
-        * @param info the information used to create the row
-        * @return the new partition object that contain the new information used to create the row
-        * @throws MDBCServiceException
-        */
+     * This function is used to create a new row in the MRI table
+     * @param info the information used to create the row
+     * @return the new partition object that contain the new information used to create the row
+     * @throws MDBCServiceException
+     */
        DatabasePartition createMusicRangeInformation(MusicRangeInformationRow info) throws MDBCServiceException;
 
     /**
@@ -223,48 +227,58 @@ public interface MusicInterface {
        void createMusicRangeDependency(RangeDependency rangeAndDependencies) throws MDBCServiceException;
 
        /**
-        * This function is used to append an index to the redo log in a MRI row
-        * @param partition information related to ownership of partitions, used to verify ownership
-        * @param newRecord index of the new record to be appended to the redo log
-        * @throws MDBCServiceException
-        */
-       void appendToRedoLog(DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException;
+     * This function is used to append an index to the redo log in a MRI row
+     * @param partition information related to ownership of partitions, used to verify ownership
+     * @param newRecord index of the new record to be appended to the redo log
+     * @throws MDBCServiceException
+     */
+       void appendToRedoLog( DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException;
 
-       /**
-        * This functions adds the tx digest to
-        * @param newId id used as index in the MTD table
-        * @param transactionDigest digest that contains all the changes performed in the transaction
-        * @throws MDBCServiceException
-        */
+    /**
+     * This functions adds the tx digest to
+     * @param newId id used as index in the MTD table
+     * @param transactionDigest digest that contains all the changes performed in the transaction
+     * @throws MDBCServiceException
+     */
        void addTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException;
-
+       
        /**
-        * Function used to retrieve a given transaction digest and deserialize it
-        * @param id of the transaction digest to be retrieved
-        * @return the deserialize transaction digest that can be applied to the local SQL database
-        * @throws MDBCServiceException
-        */
+     * This functions adds the eventual tx digest to
+     * @param newId id used as index in the MTD table
+     * @param transactionDigest digest that contains all the changes performed in the transaction
+     * @throws MDBCServiceException
+     */
+       
+    void addEventualTxDigest(MusicTxDigestId newId, String transactionDigest)
+            throws MDBCServiceException;
+
+    /**
+     * Function used to retrieve a given transaction digest and deserialize it
+     * @param id of the transaction digest to be retrieved
+     * @return the deserialize transaction digest that can be applied to the local SQL database
+     * @throws MDBCServiceException
+     */
        HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException;
 
-       /**
-        * Use this functions to verify ownership, and own new ranges
-        * @param ranges the ranges that should be own after calling this function
-        * @param partition current information of the ownership in the system
+    /**
+     * Use this functions to verify ownership, and own new ranges
+     * @param ranges the ranges that should be own after calling this function
+     * @param partition current information of the ownership in the system
      * @param ownOpId is the id used to describe this ownership operation (it is not used to create the new row, if any is
      *                required
         * @return an object indicating the status of the own function result
-        * @throws MDBCServiceException
-        */
+     * @throws MDBCServiceException
+     */
        OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID ownOpId) throws MDBCServiceException;
 
-       /**
-        * This function relinquish ownership, if it is time to do it, it should be used at the end of a commit operation
-        * @param partition information of the partition that is currently being owned
-        * @throws MDBCServiceException
-        */
+    /**
+     * This function relinquish ownership, if it is time to do it, it should be used at the end of a commit operation
+     * @param partition information of the partition that is currently being owned
+     * @throws MDBCServiceException
+     */
        void relinquishIfRequired(DatabasePartition partition) throws MDBCServiceException;
 
-       /**
+    /**
         * This function is in charge of owning all the ranges requested and creating a new row that show the ownership of all
         * those ranges.
         * @param rangeId new id to be used in the new row
@@ -276,17 +290,17 @@ public interface MusicInterface {
        //OwnershipReturn appendRange(String rangeId, List<Range> ranges, DatabasePartition partition) throws MDBCServiceException;
 
        /**
-        * This functions relinquishes a range
-        * @param ownerId id of the current ownerh
-        * @param rangeId id of the range to be relinquished
-        * @throws MusicLockingException
-        */
+     * This functions relinquishes a range
+     * @param ownerId id of the current ownerh
+     * @param rangeId id of the range to be relinquished
+     * @throws MusicLockingException
+     */
        void relinquish(String ownerId, String rangeId) throws MDBCServiceException;
 
-       /**
-        * This function return all the range indexes that are currently hold by any of the connections in the system
-        * @return list of ids of rows in MRI
-        */
+    /**
+     * This function return all the range indexes that are currently hold by any of the connections in the system
+     * @return list of ids of rows in MRI
+     */
        List<UUID> getPartitionIndexes() throws MDBCServiceException;
 
     /**
@@ -294,7 +308,7 @@ public interface MusicInterface {
      * @param digest this contain all the changes that were perfomed in this digest
      * @throws MDBCServiceException
      */
-       void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException;
+    void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException;
 
     /**
      * This function is in charge of deleting old mri rows that are not longer contain
@@ -303,8 +317,11 @@ public interface MusicInterface {
      */
     void deleteOldMriRows(Map<UUID,String> oldRowsAndLocks) throws MDBCServiceException;
 
-       List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException;
+    List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException;
 
-       OwnershipAndCheckpoint getOwnAndCheck();
+    OwnershipAndCheckpoint getOwnAndCheck();
+    
+    
+    ArrayList<HashMap<Range, StagingTable>> getEveTxDigest() throws MDBCServiceException;
 }
 
index 068a64d..6eacb4f 100755 (executable)
@@ -23,27 +23,45 @@ import java.io.IOException;
 import java.io.Reader;
 import java.nio.ByteBuffer;
 import java.sql.Types;
-import java.util.*;
-
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.function.BiFunction;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang3.tuple.Pair;
-import org.onap.music.mdbc.*;
-import org.onap.music.mdbc.ownership.Dag;
-import org.onap.music.mdbc.ownership.DagNode;
-import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
-import org.onap.music.mdbc.tables.*;
 import org.json.JSONObject;
+import org.onap.music.datastore.Condition;
 import org.onap.music.datastore.PreparedQueryObject;
+import org.onap.music.exceptions.MDBCServiceException;
 import org.onap.music.exceptions.MusicLockingException;
 import org.onap.music.exceptions.MusicQueryException;
 import org.onap.music.exceptions.MusicServiceException;
-import org.onap.music.datastore.Condition;
+import org.onap.music.logging.EELFLoggerDelegate;
 import org.onap.music.main.MusicCore;
 import org.onap.music.main.ResultType;
 import org.onap.music.main.ReturnType;
-
-import org.onap.music.exceptions.MDBCServiceException;
-import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.DatabasePartition;
+import org.onap.music.mdbc.MDBCUtils;
+import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.TableInfo;
+import org.onap.music.mdbc.ownership.Dag;
+import org.onap.music.mdbc.ownership.DagNode;
+import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
+import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
+import org.onap.music.mdbc.tables.RangeDependency;
+import org.onap.music.mdbc.tables.StagingTable;
+import org.onap.music.mdbc.tables.TxCommitProgress;
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.ColumnDefinitions;
 import com.datastax.driver.core.DataType;
@@ -101,6 +119,7 @@ public class MusicMixin implements MusicInterface {
 
     //\TODO Add logic to change the names when required and create the tables when necessary
     private String musicTxDigestTableName = "musictxdigest";
+    private String musicEventualTxDigestTableName = "musicevetxdigest";
     private String musicRangeInformationTableName = "musicrangeinformation";
     private String musicRangeDependencyTableName = "musicrangedependency";
 
@@ -193,6 +212,7 @@ public class MusicMixin implements MusicInterface {
     private OwnershipAndCheckpoint ownAndCheck;
 
     public MusicMixin() {
+
         //this.logger         = null;
         this.musicAddress   = null;
         this.music_ns       = null;
@@ -272,7 +292,6 @@ public class MusicMixin implements MusicInterface {
         Row row = rs.one();
         return (row == null) ? "UNKNOWN" : row.getUUID("HOST_ID").toString();
     }
-
     private String getAllHostIds() {
         ResultSet results = null;
         try {
@@ -297,7 +316,6 @@ public class MusicMixin implements MusicInterface {
     public String getMixinName() {
         return "cassandra";
     }
-
     /**
      * Do what is needed to close down the MUSIC connection.
      */
@@ -316,6 +334,7 @@ public class MusicMixin implements MusicInterface {
         createKeyspace();
         try {
             createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number
+            createMusicEventualTxDigest();
             createMusicRangeInformationTable();
             createMusicRangeDependencyTable();
         }
@@ -1254,6 +1273,8 @@ public class MusicMixin implements MusicInterface {
         return newRanges;
     }
 
+
+
     protected String createAndAssignLock(String fullyQualifiedKey, DatabasePartition partition) throws MDBCServiceException {
         UUID mriIndex = partition.getMRIIndex();
         String lockId;
@@ -1290,8 +1311,21 @@ public class MusicMixin implements MusicInterface {
         }
     }
 
-    public void commitLog(DatabasePartition partition, HashMap<Range,StagingTable> transactionDigest,
-                          String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
+
+    /**
+     * Writes the transaction information to metric's txDigest and musicRangeInformation table
+     * This officially commits the transaction globally
+     */
+    @Override
+    public void commitLog(DatabasePartition partition, List<Range> eventualRanges, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
+        
+        // first deal with commit for eventually consistent tables
+        filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
+        
+        // if strong consistency tables are not present in the transaction then return
+        if(partition == null || partition.getMRIIndex() == null)
+            return;
+        
         UUID mriIndex = partition.getMRIIndex();
         String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex;
         //0. See if reference to lock was already created
@@ -1312,6 +1346,10 @@ public class MusicMixin implements MusicInterface {
         //Add creation type of transaction digest
 
         //1. Push new row to RRT and obtain its index
+        if(transactionDigest == null || transactionDigest.isEmpty()) {
+            return;
+        }
+        
         String serializedTransactionDigest;
         try {
             serializedTransactionDigest = MDBCUtils.toString(transactionDigest);
@@ -1328,6 +1366,66 @@ public class MusicMixin implements MusicInterface {
         appendToRedoLog(partition,digestId);
     }
 
+    private void filterAndAddEventualTxDigest(List<Range> eventualRanges,
+            HashMap<Range, StagingTable> transactionDigest, String txId,
+            TxCommitProgress progressKeeper) throws MDBCServiceException {
+        
+        if(eventualRanges == null || eventualRanges.isEmpty()) {
+            return;
+        }
+        
+        HashMap<Range,StagingTable> eventualTransactionDigest = new HashMap<Range,StagingTable>();
+        
+        for(Range eventualRange: eventualRanges) {
+            transactionDigest.computeIfPresent(eventualRange, new BiFunction<Range,StagingTable,StagingTable>() {
+
+                @Override
+                public StagingTable apply(Range key, StagingTable value) {
+                    eventualTransactionDigest.put(key, value);
+                    //transactionDigest.remove(key);
+                    return null;
+                }
+                
+            });
+        }
+        
+        UUID commitId = getCommitId(txId, progressKeeper);
+        
+        //1. Push new row to RRT
+        
+        String serializedTransactionDigest;
+        if(eventualTransactionDigest != null && !eventualTransactionDigest.isEmpty()) {
+        
+            try {
+                serializedTransactionDigest = MDBCUtils.toString(eventualTransactionDigest);
+            } catch (IOException e) {
+                throw new MDBCServiceException("Failed to serialized transaction digest with error "+e.toString(), e);
+            }
+            MusicTxDigestId digestId = new MusicTxDigestId(commitId,-1);
+            addEventualTxDigest(digestId, serializedTransactionDigest);
+            
+            if(progressKeeper!= null) {
+                progressKeeper.setRecordId(txId,digestId);
+            }
+        }
+        
+        
+    }
+
+    private UUID getCommitId(String txId, TxCommitProgress progressKeeper)
+            throws MDBCServiceException {
+        UUID commitId;
+        //Generate a local commit id
+        if(progressKeeper.containsTx(txId)) {
+            commitId = progressKeeper.getCommitId(txId);
+        }
+        else{
+            logger.error(EELFLoggerDelegate.errorLogger, "Tx with id "+txId+" was not created in the TxCommitProgress ");
+            throw new MDBCServiceException("Tx with id "+txId+" was not created in the TxCommitProgress ");
+        }
+        return commitId;
+    }
+
     /**
      * @param tableName
      * @param string
@@ -1452,10 +1550,10 @@ public class MusicMixin implements MusicInterface {
         String cql = String.format("SELECT * FROM %s.%s WHERE range = ?;", music_ns, musicRangeDependencyTableName);
         PreparedQueryObject pQueryObject = new PreparedQueryObject();
         pQueryObject.appendQueryString(cql);
-        pQueryObject.addValue(baseRange.table);
+        pQueryObject.addValue(baseRange.getTable());
         Row newRow;
         try {
-            newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,baseRange.table,null);
+            newRow = executeMusicLockedGet(music_ns, musicRangeDependencyTableName,pQueryObject,baseRange.getTable(),null);
         } catch (MDBCServiceException e) {
             logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
             throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e);
@@ -1515,12 +1613,12 @@ public class MusicMixin implements MusicInterface {
     public void createMusicRangeDependency(RangeDependency rangeAndDependencies) throws MDBCServiceException {
         StringBuilder insert = new StringBuilder("INSERT INTO ")
                 .append(this.music_ns)
-                       .append('.')
-                       .append(this.musicRangeDependencyTableName)
-                       .append(" (range,dependencies) VALUES ")
-                       .append("(")
-                       .append(rangeAndDependencies.getRange().table)
-                       .append(",{");
+                .append('.')
+                .append(this.musicRangeDependencyTableName)
+                .append(" (range,dependencies) VALUES ")
+                .append("(")
+                .append(rangeAndDependencies.getRange().getTable())
+                .append(",{");
         boolean first=true;
         for(Range r: rangeAndDependencies.dependentRanges()){
             if(first){ first=false; }
@@ -1537,16 +1635,16 @@ public class MusicMixin implements MusicInterface {
 
 
     private UUID createEmptyMriRow(List<Range> rangesCopy) {
-       //TODO: THis should call one of the other createMRIRows
-       UUID id = generateUniqueKey();
+        //TODO: THis should call one of the other createMRIRows
+        UUID id = generateUniqueKey();
         StringBuilder insert = new StringBuilder("INSERT INTO ")
                 .append(this.music_ns)
-                       .append('.')
-                       .append(this.musicRangeInformationTableName)
-                       .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ")
-                       .append("(")
-                       .append(id)
-                       .append(",{");
+                .append('.')
+                .append(this.musicRangeInformationTableName)
+                .append(" (rangeid,keys,ownerid,metricprocessid,txredolog) VALUES ")
+                .append("(")
+                .append(id)
+                .append(",{");
         boolean first=true;
         for(Range r: rangesCopy){
             if(first){ first=false; }
@@ -1565,7 +1663,7 @@ public class MusicMixin implements MusicInterface {
         MusicCore.eventualPut(query);
         return id;
     }
-    
+        
     
     /**
      * Creates a new empty MRI row
@@ -1635,14 +1733,46 @@ public class MusicMixin implements MusicInterface {
     public void createMusicTxDigest() throws MDBCServiceException {
         createMusicTxDigest(-1);
     }
+    
+    public void createMusicEventualTxDigest() throws MDBCServiceException {
+        createMusicEventualTxDigest(-1);
+    }
 
 
     /**
-     * This function creates the MusicTxDigest table. It contain information related to each transaction committed
+     * This function creates the MusicEveTxDigest table. It contain information related to each eventual transaction committed
      *         * LeaseId: id associated with the lease, text
      *         * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
      *  * TransactionDigest: text that contains all the changes in the transaction
      */
+    private void createMusicEventualTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException {
+        String tableName = this.musicEventualTxDigestTableName;
+        if(musicTxDigestTableNumber >= 0) {
+            tableName = tableName +
+                "-" +
+                Integer.toString(musicTxDigestTableNumber);
+        }
+        String priKey = "txTimeId";
+        StringBuilder fields = new StringBuilder();
+        fields.append("txid uuid, ");
+        fields.append("transactiondigest text, ");
+        fields.append("txTimeId TIMEUUID ");//notice lack of ','
+        String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", this.music_ns, tableName, fields, priKey);
+        try {
+            executeMusicWriteQuery(this.music_ns,tableName,cql);
+        } catch (MDBCServiceException e) {
+            logger.error("Initialization error: Failure to create redo records table");
+            throw(e);
+        }
+    }
+    
+    
+    /**
+     * This function creates the MusicTxDigest table. It contain information related to each transaction committed
+     *  * LeaseId: id associated with the lease, text
+     *  * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
+     *  * TransactionDigest: text that contains all the changes in the transaction
+     */
     private void createMusicTxDigest(int musicTxDigestTableNumber) throws MDBCServiceException {
         String tableName = this.musicTxDigestTableName;
         if(musicTxDigestTableNumber >= 0) {
@@ -1704,6 +1834,35 @@ public class MusicMixin implements MusicInterface {
             throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString(), e);
         }
     }
+    
+    /**
+     * Writes the Eventual transaction history to the evetxDigest
+     */
+    @Override
+    public void addEventualTxDigest(MusicTxDigestId newId, String transactionDigest) throws MDBCServiceException {
+        //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest);
+        PreparedQueryObject query = new PreparedQueryObject();
+        String cqlQuery = "INSERT INTO " +
+            this.music_ns +
+            '.' +
+            this.musicEventualTxDigestTableName +
+            " (txid,transactiondigest,txTimeId) " +
+            "VALUES (" +
+            newId.txId + ",'" +
+            transactionDigest + 
+            "'," +
+           // "toTimestamp(now())" +
+           "now()" +
+            ");";
+        query.appendQueryString(cqlQuery);
+        //\TODO check if I am not shooting on my own foot
+        try {
+            MusicCore.nonKeyRelatedPut(query,"critical");
+        } catch (MusicServiceException e) {
+            logger.error(EELFLoggerDelegate.errorLogger, "Transaction Digest serialization was invalid for commit "+newId.txId.toString()+ "with error "+e.getErrorMessage());
+            throw new MDBCServiceException("Transaction Digest serialization for commit "+newId.txId.toString(), e);
+        }
+    }
 
     @Override
     public HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException {
@@ -1731,7 +1890,49 @@ public class MusicMixin implements MusicInterface {
         }
         return changes;
     }
+    
+    
+    @Override
+    public ArrayList<HashMap<Range,StagingTable>> getEveTxDigest() throws MDBCServiceException {
+        HashMap<Range,StagingTable> changes;
+        ArrayList<HashMap<Range,StagingTable>> ecDigestList = new ArrayList<HashMap<Range,StagingTable>>();
+            
+            //but was this timestamp is getting added as per post: https://dev.mysql.com/doc/refman/8.0/en/time-zone-leap-seconds.html
+            //Ex1: SELECT uuid, txDigest, UNIX_TIMESTAMP(ts) FROM ectxdigest ORDER by ts;
+            //Ex2: SELECT * FROM ectxdigest ORDER by ts; or SELECT * FROM ectxdigest
+          //####### this will pull all records.. but REPLAY will be against specific records once the NODE it back ON-Line.
+            // I should get the last record timestamp so that I can put a where condition.
+            //EX3: SELECT uuid, txDigest, UNIX_TIMESTAMP(ts) FROM ectxdigest where UNIX_TIMESTAMP(ts)>UNIX_TIMESTAMP(<<Date/Time value from others>>) ORDER by ts;
+        String cql = String.format("SELECT * FROM %s.%s ;", music_ns, this.musicEventualTxDigestTableName);  
+            // Ex 1 & 2 might return millions of records!! things to consider outOfMemory issue, performance issue etc.. How to overcome??
+            // Ex 3: will return less records compare to Ex:1 and Ex:2.
+        PreparedQueryObject pQueryObject = new PreparedQueryObject();
+        pQueryObject.appendQueryString(cql);
 
+            // I need to get a ResultSet of all the records and give each row to the below HashMap.
+        ResultSet rs = executeMusicRead(pQueryObject.toString());
+        while (!rs.isExhausted()) {
+            Row row = rs.one();
+            String digest = row.getString("transactiondigest");
+            
+            try {
+                changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest);
+            } catch (IOException e) {
+                logger.error("IOException when deserializing digest");
+                throw new MDBCServiceException("Deserializng digest failed with ioexception", e);
+            } catch (ClassNotFoundException e) {
+                logger.error("Deserializng digest failed with an invalid class");
+                throw new MDBCServiceException("Deserializng digest failed with an invalid class", e);
+            }
+
+            ecDigestList.add(changes);
+        }      
+            
+        return ecDigestList;
+    }
+
+
+    
     ResultSet getAllMriCassandraRows() throws MDBCServiceException {
         StringBuilder cqlOperation = new StringBuilder();
         cqlOperation.append("SELECT * FROM ")
@@ -1997,7 +2198,11 @@ public class MusicMixin implements MusicInterface {
 
     @Override
     public OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID opId) throws MDBCServiceException {
-        Map<UUID,LockResult> newLocks = new HashMap<>();
+       
+       if(ranges == null || ranges.isEmpty()) 
+              return null;
+       
+       Map<UUID,LockResult> newLocks = new HashMap<>();
         //Init timeout clock
         ownAndCheck.startOwnershipTimeoutClock(opId);
         //Find
@@ -2285,7 +2490,7 @@ public class MusicMixin implements MusicInterface {
 
     @Override
     public void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException{
-        throw new NotImplementedException("Error, replay transaction in music mixin needs to be implemented");
+        //throw new NotImplementedException("Error, replay transaction in music mixin needs to be implemented");
     }
 
     @Override
index af935ef..25b3de0 100755 (executable)
@@ -17,7 +17,6 @@
  * limitations under the License.
  * ============LICENSE_END======================================================
  */
-
 package org.onap.music.mdbc.mixins;
 
 import java.sql.Connection;
@@ -844,7 +843,7 @@ NEW.field refers to the new value
                
                jdbcConn.setAutoCommit(autocommit);
     }
-
+       
        @Override
        public void disableForeignKeyChecks() throws SQLException {
            Statement disable = jdbcConn.createStatement();
@@ -896,7 +895,7 @@ NEW.field refers to the new value
                switch (op.getOperationType()) {
                case INSERT:
                        sql.append(op.getOperationType() + " INTO ");
-                       sql.append(r.table + " (") ;
+                       sql.append(r.getTable() + " (") ;
                        sep = "";
                        for (String col: cols) {
                                sql.append(sep + col);
@@ -912,7 +911,7 @@ NEW.field refers to the new value
                        break;
                case UPDATE:
                        sql.append(op.getOperationType() + " ");
-                       sql.append(r.table + " SET ");
+                       sql.append(r.getTable() + " SET ");
                        sep="";
                        for (int i=0; i<cols.size(); i++) {
                                sql.append(sep + cols.get(i) + "=\"" + vals.get(i) +"\"");
@@ -924,7 +923,7 @@ NEW.field refers to the new value
                        break;
                case DELETE:
                        sql.append(op.getOperationType() + " FROM ");
-                       sql.append(r.table + " WHERE ");
+                       sql.append(r.getTable() + " WHERE ");
                        sql.append(getPrimaryKeyConditional(op.getKey()));
                        sql.append(";");
                        break;
index a1228d5..68d1f19 100644 (file)
@@ -359,7 +359,7 @@ public class Dag {
             Range range = rangeAndNodes.getKey();
             Set<DagNode> nodes = rangeAndNodes.getValue();
             if(nodes.size() > 2){
-                logger.error("Range "+range.table+"has more than 2 active rows");
+                logger.error("Range "+range.getTable()+"has more than 2 active rows");
                 throw new MDBCServiceException("Range has more than 2 active rows");
             }
             else if(nodes.size()==2){
index fa15354..1da2d79 100644 (file)
  */
 package org.onap.music.mdbc.tables;
 
-import java.io.IOException;
 import java.sql.SQLException;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
-
-import org.json.JSONObject;
-import org.onap.music.datastore.PreparedQueryObject;
 import org.onap.music.exceptions.MDBCServiceException;
-import org.onap.music.exceptions.MusicServiceException;
 import org.onap.music.logging.EELFLoggerDelegate;
 import org.onap.music.mdbc.DatabasePartition;
-import org.onap.music.mdbc.MDBCUtils;
 import org.onap.music.mdbc.MdbcConnection;
-import org.onap.music.mdbc.MdbcServerLogic;
 import org.onap.music.mdbc.Range;
 import org.onap.music.mdbc.StateManager;
-import org.onap.music.mdbc.configurations.NodeConfiguration;
-import org.onap.music.mdbc.mixins.MusicMixin;
 import org.onap.music.mdbc.mixins.DBInterface;
 import org.onap.music.mdbc.mixins.MusicInterface;
 
-import com.datastax.driver.core.Row;
-
 public class MusicTxDigest {
        private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicTxDigest.class);
        
@@ -90,7 +76,8 @@ public class MusicTxDigest {
                                for (UUID partition : partitions) {
                                        if (!partition.equals(myPartition.getMRIIndex())) {
                                                try {
-                                                       replayDigestForPartition(mi, partition, dbi);
+                                                       //replayDigestForPartition(mi, partition, dbi);
+                                                   mi.getOwnAndCheck().warmup(mi, dbi, myPartition.getSnapshot());
                                                } catch (MDBCServiceException e) {
                                                        logger.error("Unable to update for partition : " + partition + ". " + e.getMessage());
                                                        continue;
@@ -98,33 +85,71 @@ public class MusicTxDigest {
                                        }
                                }
                        }
+               
+                       //Step 3: ReplayDigest() for E.C conditions
+                       try {
+                               replayDigest(mi,dbi);
+                       } catch (MDBCServiceException e) {
+                               logger.error("Unable to perform Eventual Consistency operations" + e.getMessage());
+                               continue;
+                       }       
+                       
                }
        }
-       
+
        /**
-        * Replay the digest for a given partition
+        * Replay the digest for eventual consistency.
         * @param mi music interface
         * @param partitionId the partition to be replayed
         * @param dbi interface to the database that will replay the operations
         * @throws MDBCServiceException
         */
-       public static void replayDigestForPartition(MusicInterface mi, UUID partitionId, DBInterface dbi) throws MDBCServiceException {
-               List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog();
-               for (MusicTxDigestId txId: partitionsRedoLogTxIds) {
-                       HashMap<Range, StagingTable> transaction = mi.getTxDigest(txId);
+       public static void replayDigest(MusicInterface mi, DBInterface dbi) throws MDBCServiceException {
+                                       //List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog();
+                                       //From where should I fetch TransactionsIDs ??? from NEW TABLE ?? or EXISING TABLE ?? << what the new SITE_TABLE details??
+                                       // --> It is a new table called ECTxDigest
+                                       //I should sort/ call a method which gives all the entires of  a table based on the time-stamp from Low to High
+
+               ArrayList<HashMap<Range, StagingTable>> ecTxDigest = mi.getEveTxDigest();
+               
+                                       //for (MusicTxDigestId txId: partitionsRedoLogTxIds) { // partitionsRedoLogTxIds --> this comes from new table where timeStamp > currentTimeStamp  ( THIS SHOULD BE lessthan.. which is ASC order)
+                                       //HashMap<Range, StagingTable> transaction = mi2.getEcTxDigest();  // Getting records from musictxdigest TABLE.
+               for (HashMap<Range, StagingTable> transaction: ecTxDigest) {
                        try {
-                               //\TODO do this two operations in parallel
-                               dbi.replayTransaction(transaction);
-                               mi.replayTransaction(transaction);
+                               dbi.replayTransaction(transaction); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??)
                        } catch (SQLException e) {
-                               logger.error("Rolling back the entire digest replay. " + partitionId);
+                               logger.error("EC:Rolling back the entire digest replay.");
                                return;
                        }
-                       logger.info("Successfully replayed transaction " + txId);
+                       logger.info("EC: Successfully replayed transaction ");
                }
-               //todo, keep track of where I am in pointer
        }
 
+       
+       /**
+        * Replay the digest for a given partition
+        * @param mi music interface
+        * @param partitionId the partition to be replayed
+        * @param dbi interface to the database that will replay the operations
+        * @throws MDBCServiceException
+        */
+       public static void replayDigestForPartition(MusicInterface mi, UUID partitionId, DBInterface dbi) throws MDBCServiceException {
+        List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog();
+        for (MusicTxDigestId txId: partitionsRedoLogTxIds) {
+            HashMap<Range, StagingTable> transaction = mi.getTxDigest(txId);
+            try {
+                //\TODO do this two operations in parallel
+                dbi.replayTransaction(transaction);
+                mi.replayTransaction(transaction);
+            } catch (SQLException e) {
+                logger.error("Rolling back the entire digest replay. " + partitionId);
+                return;
+            }
+            logger.info("Successfully replayed transaction " + txId);
+        }
+        //todo, keep track of where I am in pointer
+    }
+
        /**
         * Start the background daemon defined by this object
         * Spawns a new thread and runs "backgroundDaemon"
index 5c50b6d..2d31939 100644 (file)
 
 package org.onap.music.mdbc;
 
-import static org.junit.Assert.assertEquals;
+import org.junit.*;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
 import org.onap.music.mdbc.mixins.MySQLMixin;
 
 import ch.vorburger.mariadb4j.DB;
 
 public class MySQLMixinTest {
 
-       public static final String DATABASE = "mdbcTest";
+       public static final String DATABASE = "mdbctest";
        public static final String TABLE= "Persons";
        public static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS " + TABLE + " (\n" +
             "    PersonID int,\n" +
@@ -75,7 +71,7 @@ public class MySQLMixinTest {
        
        @Test
        public void testGetDataBaseName() throws SQLException {
-               assertEquals(DATABASE, mysqlMixin.getDatabaseName());
+               Assert.assertEquals(DATABASE, mysqlMixin.getDatabaseName());
        }
 
 }
index 0e7b030..753c629 100644 (file)
@@ -158,7 +158,7 @@ public class OwnershipAndCheckpointTest {
         String id = MDBCUtils.generateUniqueKey().toString();
         TxCommitProgress progressKeeper = new TxCommitProgress();
         progressKeeper.createNewTransactionTracker(id ,this.conn);
-        musicMixin.commitLog(partition, stagingTable, id, progressKeeper);
+        musicMixin.commitLog(partition, null, stagingTable, id, progressKeeper);
         TestUtils.unlockRow(keyspace,mriTableName,partition);
     }