Replay transaction bugfix 53/91953/2
authorTschaen, Brendan <ctschaen@att.com>
Wed, 24 Jul 2019 15:13:14 +0000 (11:13 -0400)
committerTschaen, Brendan <ctschaen@att.com>
Wed, 24 Jul 2019 15:40:32 +0000 (11:40 -0400)
Must replay all transaction in current partition's history

Issue-ID: MUSIC-464
Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
Change-Id: I4e0cc7f1aca0b8506f2ca617169a427cc85bbbeb

mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/PostgresMixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestDaemon.java
mdbc-server/src/test/java/org/onap/music/mdbc/mixins/PostgresMixinTest.java

index 745307c..15dd456 100755 (executable)
@@ -134,13 +134,13 @@ public interface DBInterface {
         * @throws SQLException if replay cannot occur correctly
         * @throws MDBCServiceException 
         */
-       void replayTransaction(StagingTable digest, Set<Range> ranges) throws SQLException, MDBCServiceException;
+       void replayTransaction(StagingTable digest) throws SQLException, MDBCServiceException;
 
        void disableForeignKeyChecks() throws SQLException;
 
        void enableForeignKeyChecks() throws SQLException;
 
-       void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException;
+       void applyTxDigest(StagingTable txDigest) throws SQLException, MDBCServiceException;
 
        Connection getSQLConnection();
 
index 8cab635..d4581d7 100755 (executable)
@@ -891,7 +891,7 @@ public class MySQLMixin implements DBInterface {
      * @param transaction - base 64 encoded, serialized digest
      * @throws MDBCServiceException
      */
-    public void replayTransaction(StagingTable transaction, Set<Range> ranges)
+    public void replayTransaction(StagingTable transaction)
             throws SQLException, MDBCServiceException {
         boolean autocommit = jdbcConn.getAutoCommit();
         jdbcConn.setAutoCommit(false);
@@ -899,7 +899,6 @@ public class MySQLMixin implements DBInterface {
         ArrayList<Operation> opList = transaction.getOperationList();
 
         for (Operation op : opList) {
-            if (Range.overlaps(ranges, op.getTable())) {
                 try {
                     replayOperationIntoDB(jdbcStmt, op);
                 } catch (SQLException | MDBCServiceException e) {
@@ -909,7 +908,6 @@ public class MySQLMixin implements DBInterface {
                     jdbcConn.rollback();
                     throw e;
                 }
-            }
         }
 
         clearReplayedOperations(jdbcStmt);
@@ -934,8 +932,8 @@ public class MySQLMixin implements DBInterface {
     }
 
     @Override
-    public void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException {
-        replayTransaction(txDigest, ranges);
+    public void applyTxDigest(StagingTable txDigest) throws SQLException, MDBCServiceException {
+        replayTransaction(txDigest);
     }
 
     /**
@@ -1183,5 +1181,4 @@ public class MySQLMixin implements DBInterface {
             logger.error(EELFLoggerDelegate.errorLogger, "initTables: problem creating th mdbc tables!");
         }
     }
-
 }
\ No newline at end of file
index 4afaa71..6d2d4cf 100755 (executable)
@@ -817,7 +817,7 @@ public class PostgresMixin implements DBInterface {
      * @param transaction - base 64 encoded, serialized digest
      */
     @Override
-    public void replayTransaction(StagingTable transaction, Set<Range> ranges)
+    public void replayTransaction(StagingTable transaction)
             throws SQLException, MDBCServiceException {
         boolean autocommit = jdbcConn.getAutoCommit();
         jdbcConn.setAutoCommit(false);
@@ -825,7 +825,6 @@ public class PostgresMixin implements DBInterface {
         final ArrayList<Operation> opList = transaction.getOperationList();
 
         for (Operation op : opList) {
-            if (Range.overlaps(ranges, op.getTable())) {
                 try {
                     replayOperationIntoDB(jdbcStmt, op);
                 } catch (SQLException | MDBCServiceException e) {
@@ -835,7 +834,6 @@ public class PostgresMixin implements DBInterface {
                     jdbcConn.rollback();
                     throw e;
                 }
-            }
         }
 
         clearReplayedOperations(jdbcStmt);
@@ -859,8 +857,8 @@ public class PostgresMixin implements DBInterface {
     }
 
     @Override
-    public void applyTxDigest(StagingTable txDigest, Set<Range> ranges) throws SQLException, MDBCServiceException {
-        replayTransaction(txDigest, ranges);
+    public void applyTxDigest(StagingTable txDigest) throws SQLException, MDBCServiceException {
+        replayTransaction(txDigest);
     }
 
     /**
index b848964..658d127 100644 (file)
@@ -171,10 +171,10 @@ public class OwnershipAndCheckpoint{
         }
     }
 
-    private void applyTxDigest(Set<Range> ranges, DBInterface di, StagingTable txDigest)
+    private void applyTxDigest(DBInterface di, StagingTable txDigest)
         throws MDBCServiceException {
         try {
-            di.applyTxDigest(txDigest,ranges);
+            di.applyTxDigest(txDigest);
         } catch (SQLException e) {
             throw new MDBCServiceException("Error applying tx digest in local SQL",e);
         }
@@ -213,7 +213,7 @@ public class OwnershipAndCheckpoint{
                             checkpointLock.unlock();
                             break;
                         } else {
-                            applyDigestAndUpdateDataStructures(mi, di, rangesToWarmup, node, pair);
+                            applyDigestAndUpdateDataStructures(mi, di, node, pair);
                         }
                         pair = node.nextNotAppliedTransaction(rangeSet);
                         enableForeignKeys(di);
@@ -228,15 +228,14 @@ public class OwnershipAndCheckpoint{
     }
 
     /**
-     * Apply tx digest for ranges, update checkpoint location (alreadyApplied)
+     * Apply tx digest for dagnode update checkpoint location (alreadyApplied)
      * @param mi
      * @param di
-     * @param ranges
      * @param node
      * @param pair
      * @throws MDBCServiceException
      */
-    private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface di, Set<Range> ranges, DagNode node,
+    private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface di, DagNode node,
                                                     Pair<MusicTxDigestId, List<Range>> pair) throws MDBCServiceException {
         final StagingTable txDigest;
         try {
@@ -247,7 +246,7 @@ public class OwnershipAndCheckpoint{
                 +" case for txID "+pair.getKey().transactionId.toString());
             return;
         }
-        applyTxDigest(ranges,di, txDigest);
+        applyTxDigest(di, txDigest);
         for (Range r : pair.getValue()) {
             MusicRangeInformationRow row = node.getRow();
             alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
@@ -287,7 +286,7 @@ public class OwnershipAndCheckpoint{
             if(node!=null) {
                 Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet);
                 while (pair != null) {
-                    applyDigestAndUpdateDataStructures(mi, db, ranges, node, pair);
+                    applyDigestAndUpdateDataStructures(mi, db, node, pair);
                     pair = node.nextNotAppliedTransaction(rangeSet);
                     if (timeout(ownOpId)) {
                         enableForeignKeys(db);
index 6f95d3c..a1ef346 100644 (file)
@@ -60,8 +60,7 @@ public class MusicTxDigestDaemon implements Runnable {
                for (UUID txTimeID : keys) {
                        transaction = ecDigestInformation.get(txTimeID);
                        try {
-                               dbi.replayTransaction(transaction,
-                                       ranges); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??)
+                               dbi.replayTransaction(transaction); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??)
                        } catch (SQLException e) {
                                logger.error("EC:Rolling back the entire digest replay.");
                                return;
index a1cf2b1..d44ec8f 100644 (file)
@@ -211,7 +211,7 @@ public class PostgresMixinTest {
         Set<Range> ranges = new HashSet<>();
         ranges.add(new Range("public.testtable"));
         try {
-            mixin.applyTxDigest(st,ranges);
+            mixin.applyTxDigest(st);
         } catch (SQLException|MDBCServiceException e) {
             e.printStackTrace();
             fail();