Replay Transaction Updates 99/92899/1
authorTschaen, Brendan <ctschaen@att.com>
Tue, 6 Aug 2019 17:37:29 +0000 (13:37 -0400)
committerTschaen, Brendan <ctschaen@att.com>
Tue, 6 Aug 2019 17:40:06 +0000 (13:40 -0400)
Simplify replay logic, already applied datastructure
Remove table from txdigest column

Issue-ID: MUSIC-421
Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
Change-Id: Ic757e6302e05d188704e625c76a77b106e000152

12 files changed:
mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
mdbc-server/src/main/java/org/onap/music/mdbc/ownership/DagNode.java
mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java
mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicRangeInformationRow.java
mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigestId.java
mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java
mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java

index 6f097dd..42864ea 100755 (executable)
@@ -193,7 +193,8 @@ public class MdbcConnection implements Connection {
         try {
             partition = mi.splitPartitionIfNecessary(partition, rangesUsed);
         } catch (MDBCServiceException e) {
-            logger.warn(EELFLoggerDelegate.errorLogger, "Failure to split partition, trying to continue",
+            logger.warn(EELFLoggerDelegate.errorLogger,
+                    "Failure to split partition '" + partition.getMRIIndex() + "' trying to continue",
                     AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
         }
         
@@ -541,7 +542,6 @@ public class MdbcConnection implements Connection {
         DatabasePartition tempPartition = own(scRanges, MDBCUtils.getOperationType(tableToQueryType));
         if(tempPartition!=null && tempPartition != partition) {
             this.partition.updateDatabasePartition(tempPartition);
-            statemanager.getOwnAndCheck().reloadAlreadyApplied(this.partition);
         }
         dbi.preStatementHook(sql);
     }
@@ -619,7 +619,8 @@ public class MdbcConnection implements Connection {
                 MusicRangeInformationRow row = node.getRow();
                 Map<MusicRangeInformationRow, LockResult> lock = new HashMap<>();
                 lock.put(row, new LockResult(row.getPartitionIndex(), ownershipReturn.getOwnerId(), true, ranges));
-                ownAndCheck.checkpoint(this.mi, this.dbi, dag, ranges, lock, ownershipReturn.getOwnershipId());
+                ownAndCheck.checkpoint(this.mi, this.dbi, dag, ranges, ownershipReturn.getOwnershipId());
+                //TODO: need to update pointer in alreadyapplied if a merge happened instead of in prestatement hook
                 newPartition = new DatabasePartition(ownershipReturn.getRanges(), ownershipReturn.getRangeId(),
                     ownershipReturn.getOwnerId());
             }
index 8d42370..b3ec61d 100644 (file)
@@ -36,6 +36,7 @@ import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
 import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
 import org.onap.music.mdbc.tables.MriReference;
 import org.onap.music.mdbc.tables.MusicTxDigestDaemon;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
 import org.onap.music.mdbc.tables.TxCommitProgress;
 
 import java.io.IOException;
@@ -92,7 +93,7 @@ public class StateManager {
     /** a set of ranges that should be periodically updated with latest information, if null all tables should be warmed up */
     private Set<Range> rangesToWarmup;
     /** map of transactions that have already been applied/updated in this sites SQL db */
-    private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
+    private Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied;
     private OwnershipAndCheckpoint ownAndCheck;
     private Thread txDaemon ;
 
index e87f7e4..20e1d5d 100644 (file)
@@ -57,6 +57,7 @@ import org.onap.music.main.ResultType;
 import org.onap.music.main.ReturnType;
 import org.onap.music.mdbc.DatabasePartition;
 import org.onap.music.mdbc.MDBCUtils;
+import org.onap.music.mdbc.MdbcConnection;
 import org.onap.music.mdbc.Range;
 import org.onap.music.mdbc.StateManager;
 import org.onap.music.mdbc.TableInfo;
@@ -1123,22 +1124,19 @@ public class MusicMixin implements MusicInterface {
      * Build a preparedQueryObject that appends a transaction to the mriTable
      * @param mriTable
      * @param uuid
-     * @param table
      * @param redoUuid
      * @return
      */
-    private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, String table, UUID redoUuid){
+    private PreparedQueryObject createAppendMtxdIndexToMriQuery(String mriTable, UUID uuid, UUID redoUuid){
         PreparedQueryObject query = new PreparedQueryObject();
         StringBuilder appendBuilder = new StringBuilder();
         appendBuilder.append("UPDATE ")
             .append(music_ns)
             .append(".")
             .append(mriTable)
-            .append(" SET txredolog = txredolog +[('")
-            .append(table)
-            .append("',")
+            .append(" SET txredolog = txredolog +[")
             .append(redoUuid)
-            .append(")] WHERE rangeid = ")
+            .append("] WHERE rangeid = ")
             .append(uuid)
             .append(";");
         query.appendQueryString(appendBuilder.toString());
@@ -1342,8 +1340,7 @@ public class MusicMixin implements MusicInterface {
         };
         Callable<Boolean> appendCallable=()-> {
             try {
-                appendToRedoLog(music_ns, mriIndex, digestId.transactionId, lockId, musicTxDigestTableName,
-                    musicRangeInformationTableName);
+                appendToRedoLog(music_ns, mriIndex, digestId.transactionId, lockId, musicRangeInformationTableName);
                 return true;
             } catch (MDBCServiceException e) {
                 logger.error(EELFLoggerDelegate.errorLogger, "Error creating and pushing tx digest to music",e);
@@ -1369,20 +1366,12 @@ public class MusicMixin implements MusicInterface {
         if (progressKeeper != null) {
             progressKeeper.setRecordId(txId, digestId);
         }
+        
         Set<Range> ranges = partition.getSnapshot();
+        
+        Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied();
         for(Range r : ranges) {
-            Map<Range, Pair<MriReference, Integer>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied();
-            if(!alreadyApplied.containsKey(r)){
-                throw new MDBCServiceException("already applied data structure was not updated correctly and range "
-                    +r+" is not contained");
-            }
-            Pair<MriReference, Integer> rowAndIndex = alreadyApplied.get(r);
-            MriReference key = rowAndIndex.getKey();
-            if(!mriIndex.equals(key.index)){
-                throw new MDBCServiceException("already applied data structure was not updated correctly and range "+
-                    r+" is not pointing to row: "+mriIndex.toString());
-            }
-            alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), rowAndIndex.getValue()+1));
+            alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), digestId));
         }
     }    
 
@@ -1482,13 +1471,11 @@ public class MusicMixin implements MusicInterface {
 
     static public MusicRangeInformationRow getMRIRowFromCassandraRow(Row newRow){
         UUID partitionIndex = newRow.getUUID("rangeid");
-        List<TupleValue> log = newRow.getList("txredolog",TupleValue.class);
+        List<UUID> log = newRow.getList("txredolog",UUID.class);
         List<MusicTxDigestId> digestIds = new ArrayList<>();
         int index=0;
-        for(TupleValue t: log){
-            //final String tableName = t.getString(0);
-            final UUID id = t.getUUID(1);
-            digestIds.add(new MusicTxDigestId(partitionIndex,id,index++));
+        for(UUID u: log){
+            digestIds.add(new MusicTxDigestId(partitionIndex,u,index++));
         }
         Set<Range> partitions = new HashSet<>();
         Set<String> tables = newRow.getSet("keys",String.class);
@@ -1569,7 +1556,7 @@ public class MusicMixin implements MusicInterface {
         fields.append("prevmrirows set<uuid>, ");
         fields.append("islatest boolean, ");
         //TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
-        fields.append("txredolog list<frozen<tuple<text,uuid>>> ");
+        fields.append("txredolog list<uuid> ");
         String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
            namespace, tableName, fields, priKey);
         try {
@@ -1703,15 +1690,12 @@ public class MusicMixin implements MusicInterface {
     @Override
     public void appendToRedoLog(UUID MRIIndex,  String lockId, MusicTxDigestId newRecord) throws MDBCServiceException {
         logger.debug("Appending to redo log for partition " + MRIIndex + " txId=" + newRecord.transactionId);
-        appendToRedoLog(music_ns,MRIIndex,newRecord.transactionId,lockId,musicTxDigestTableName,
-            musicRangeInformationTableName);
+        appendToRedoLog(music_ns,MRIIndex,newRecord.transactionId,lockId,musicRangeInformationTableName);
     }
 
-    public void appendToRedoLog(String musicNamespace, UUID MRIIndex, UUID transactionId, String lockId,
-                                        String musicTxDigestTableName, String musicRangeInformationTableName)
+    public void appendToRedoLog(String musicNamespace, UUID MRIIndex, UUID transactionId, String lockId, String musicRangeInformationTableName)
         throws MDBCServiceException{
-        PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MRIIndex,
-            musicTxDigestTableName, transactionId);
+        PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, MRIIndex, transactionId);
         ReturnType returnType = MusicCore.criticalPut(musicNamespace, musicRangeInformationTableName, MRIIndex.toString(),
             appendQuery, lockId, null);
         //returnType.getExecutionInfo()
@@ -2226,13 +2210,15 @@ public class MusicMixin implements MusicInterface {
 
         changeIsLatestToMRI(partition.getMRIIndex(), false, partition.getLockId());
 
-        Map<Range, Pair<MriReference, Integer>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied();
+        /*
+        Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied();
         for (Range range: rangesUsed) {
             alreadyApplied.put(range, Pair.of(new MriReference(usedRow.getPartitionIndex()), -1));
         }
         for (Range range: rangesNotUsed) {
             alreadyApplied.put(range, Pair.of(new MriReference(unusedRow.getPartitionIndex()), -1));
         }
+        */
 
         //release/update old partition info
         relinquish(unusedRow.getDBPartition());
index 3af6f0f..2c501dc 100755 (executable)
@@ -247,7 +247,10 @@ public class MySQLMixin implements DBInterface {
         logger.debug(EELFLoggerDelegate.applicationLogger, "getSQLTableSet returning: " + set);
         Set<Range> rangeSet = new HashSet<>();
         for (String table : set) {
-            rangeSet.add(new Range(table));
+            if (getReservedTblNames().contains(table)) {
+                // Don't create triggers for the table the triggers write into!!!
+                rangeSet.add(new Range(table));
+            }
         }
         return rangeSet;
     }
index 9d1685c..142cb34 100644 (file)
@@ -32,6 +32,7 @@ import org.onap.music.mdbc.Range;
 import org.onap.music.mdbc.tables.MriReference;
 import org.onap.music.mdbc.tables.MriRowComparator;
 import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
 
 public class Dag {
 
@@ -145,7 +146,6 @@ public class Dag {
         if(!readyInit){
             initApplyDatastructures();
         }
-        Set<Range> rangesSet = new HashSet<>(ranges);
         while(!toApplyNodes.isEmpty()){
             DagNode nextNode = toApplyNodes.poll();
             List<DagNode> outgoing = nextNode.getOutgoingEdges();
@@ -155,7 +155,7 @@ public class Dag {
                     toApplyNodes.add(out);
                 }
             }
-            if(!nextNode.wasApplied(rangesSet)){
+            if(!nextNode.wasApplied(ranges)){
                 return nextNode;
             }
         }
@@ -233,23 +233,23 @@ public class Dag {
         return toApplyNodes.isEmpty();
     }
 
-    public void setAlreadyApplied(Map<Range, Pair<MriReference,Integer>> alreadyApplied, Set<Range> ranges)
+    public void setAlreadyApplied(Map<Range, Pair<MriReference,MusicTxDigestId>> alreadyApplied, Set<Range> ranges)
         throws MDBCServiceException {
-        for(Map.Entry<UUID,DagNode> node : nodes.entrySet()){
+        for (DagNode node: nodes.values()) {
             Set<Range> intersection = new HashSet<>(ranges);
-            intersection.retainAll(node.getValue().getRangeSet());
+            intersection.retainAll(node.getRangeSet());
             for(Range r : intersection){
                 if(alreadyApplied.containsKey(r)){
-                    final Pair<MriReference, Integer> appliedPair = alreadyApplied.get(r);
+                    final Pair<MriReference, MusicTxDigestId> appliedPair = alreadyApplied.get(r);
                     final MriReference appliedRow = appliedPair.getKey();
-                    final int index = appliedPair.getValue();
+                    final int index = appliedPair.getValue().index;
                     final long appliedTimestamp = appliedRow.getTimestamp();
-                    final long nodeTimestamp = node.getValue().getTimestamp();
+                    final long nodeTimestamp = node.getTimestamp();
                     if(appliedTimestamp > nodeTimestamp){
-                        setReady(node.getValue(),r);
+                        setReady(node,r);
                     }
                     else if(appliedTimestamp == nodeTimestamp){
-                        setPartiallyReady(node.getValue(),r,index);
+                        setPartiallyReady(node,r,index);
                     }
                 }
             }
index 78c68e1..5e4c899 100644 (file)
@@ -30,6 +30,7 @@ import java.util.UUID;
 import org.apache.commons.lang3.tuple.Pair;
 import org.onap.music.exceptions.MDBCServiceException;
 import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.tables.MriReference;
 import org.onap.music.mdbc.tables.MusicRangeInformationRow;
 import org.onap.music.mdbc.tables.MusicTxDigestId;
 
@@ -73,6 +74,10 @@ public class DagNode {
         return owned;
     }
 
+    /**
+     * 
+     * @return the row's MRI Index represented by this dagnode
+     */
     public UUID getId(){
         return row.getPartitionIndex();
     }
@@ -149,20 +154,25 @@ public class DagNode {
         currentIndex = currentIndex+1;
     }
 
-    public synchronized Pair<MusicTxDigestId, List<Range>> nextNotAppliedTransaction(Set<Range> ranges){
+    /**
+     * 
+     * @param ranges
+     * @return the index of the next transaction to replay and the ranges needed for this transaction
+     */
+    public synchronized Pair<MusicTxDigestId, Set<Range>> nextNotAppliedTransaction(Set<Range> ranges){
         if(row.getRedoLog().isEmpty()) return null;
         if(!applyInit){
             initializeApply(ranges);
         }
         final List<MusicTxDigestId> redoLog = row.getRedoLog();
         if(currentIndex  < redoLog.size()){
-            List<Range> responseRanges= new ArrayList<>();
+            Set<Range> responseRanges= new HashSet<>();
             startIndex.forEach((r, index) -> {
                 if(index < currentIndex){
                    responseRanges.add(r);
                 }
             });
-            return Pair.of(redoLog.get(currentIndex++),responseRanges);
+            return Pair.of(row.getRedoLog().get(currentIndex++),responseRanges);
         }
         return null;
     }
@@ -179,7 +189,7 @@ public class DagNode {
         if(row.getRedoLog().isEmpty()) return true;
         if(!applyInit){
             initializeApply(ranges);
-        }
+        }        
         return currentIndex >= row.getRedoLog().size();
     }
 
@@ -194,11 +204,13 @@ public class DagNode {
         if(o == null) return false;
         if(!(o instanceof DagNode)) return false;
         DagNode other = (DagNode) o;
-        return other.row.getPartitionIndex().equals(this.row.getPartitionIndex());
+        return other.row.equals(this.row);
     }
 
     @Override
     public int hashCode(){
         return row.getPartitionIndex().hashCode();
     }
+
+
 }
index c95644b..00180a0 100644 (file)
@@ -45,8 +45,7 @@ public class OwnershipAndCheckpoint{
 
     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(OwnershipAndCheckpoint.class);
     private Lock checkpointLock;
-    private AtomicBoolean change;
-    private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
+    private Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied;
     private Map<UUID,Long> ownershipBeginTime;
     private long timeoutInMs;
 
@@ -54,8 +53,7 @@ public class OwnershipAndCheckpoint{
       this(new HashMap<>(),Long.MAX_VALUE);
     }
 
-    public OwnershipAndCheckpoint(Map<Range, Pair<MriReference, Integer>> alreadyApplied, long timeoutInMs){
-        change = new AtomicBoolean(true);
+    public OwnershipAndCheckpoint(Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied, long timeoutInMs){
         checkpointLock = new ReentrantLock();
         this.alreadyApplied = alreadyApplied;
         ownershipBeginTime = new HashMap<>();
@@ -130,20 +128,17 @@ public class OwnershipAndCheckpoint{
      * @param di
      * @param extendedDag
      * @param ranges
-     * @param locks
      * @param ownOpId
      * @throws MDBCServiceException
      */
-    public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, Set<Range> ranges,
-        Map<MusicRangeInformationRow, LockResult> locks, UUID ownOpId) throws MDBCServiceException {
+    public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, Set<Range> ranges, UUID ownOpId)
+            throws MDBCServiceException {
         if(ranges.isEmpty()){
             return;
         }
         try {
             checkpointLock.lock();
-            change.set(true);
-            Set<Range> rangesSet = new HashSet<>(ranges);
-            extendedDag.setAlreadyApplied(alreadyApplied, rangesSet);
+            extendedDag.setAlreadyApplied(alreadyApplied, ranges);
             applyRequiredChanges(mi, di, extendedDag, ranges, ownOpId);
         }
         catch(MDBCServiceException e){
@@ -163,18 +158,18 @@ public class OwnershipAndCheckpoint{
         }
     }
 
-    private void disableForeignKeys(DBInterface di) throws MDBCServiceException {
+    private void disableForeignKeys(DBInterface dbi) throws MDBCServiceException {
         try {
-            di.disableForeignKeyChecks();
+            dbi.disableForeignKeyChecks();
         } catch (SQLException e) {
             throw new MDBCServiceException("Error disable foreign keys checks",e);
         }
     }
 
-    private void applyTxDigest(DBInterface di, StagingTable txDigest)
+    private void applyTxDigest(DBInterface dbi, StagingTable txDigest)
         throws MDBCServiceException {
         try {
-            di.applyTxDigest(txDigest);
+            dbi.applyTxDigest(txDigest);
         } catch (SQLException e) {
             throw new MDBCServiceException("Error applying tx digest in local SQL",e);
         }
@@ -191,39 +186,28 @@ public class OwnershipAndCheckpoint{
         if(rangesToWarmup.isEmpty()){
             return;
         }
-        boolean ready = false;
-        change.set(true);
-        Set<Range> rangeSet = new HashSet<Range>(rangesToWarmup);
         Dag dag = new Dag(false);
-        while(!ready){
-            if(change.get()){
-                change.set(false);
-                final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, rangesToWarmup,false);
-                dag = Dag.getDag(rows,rangesToWarmup);
-            }
-            else if(!dag.applied()){
-                DagNode node = dag.nextToApply(rangesToWarmup);
-                if(node!=null) {
-                    Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet);
-                    while (pair != null) {
+        final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, rangesToWarmup,false);
+        dag = Dag.getDag(rows,rangesToWarmup);
+        dag.setAlreadyApplied(alreadyApplied, rangesToWarmup);
+        while(!dag.applied()){
+            DagNode node = dag.nextToApply(rangesToWarmup);
+            if(node!=null) {
+                Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(rangesToWarmup);
+                while (pair != null) {
+                    checkpointLock.lock();
+                    try {
                         disableForeignKeys(di);
-                        checkpointLock.lock();
-                        if (change.get()) {
-                            enableForeignKeys(di);
-                            checkpointLock.unlock();
-                            break;
-                        } else {
-                            applyDigestAndUpdateDataStructures(mi, di, node, pair);
-                        }
-                        pair = node.nextNotAppliedTransaction(rangeSet);
+                        applyDigestAndUpdateDataStructures(mi, di, node, pair.getLeft(), pair.getRight());
+                        pair = node.nextNotAppliedTransaction(rangesToWarmup);
                         enableForeignKeys(di);
+                    } catch (MDBCServiceException e) {
                         checkpointLock.unlock();
+                        throw e;
                     }
+                    checkpointLock.unlock();
                 }
             }
-            else{
-                ready = true;
-            }
         }
     }
 
@@ -235,25 +219,54 @@ public class OwnershipAndCheckpoint{
      * @param pair
      * @throws MDBCServiceException
      */
-    private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface di, DagNode node,
-                                                    Pair<MusicTxDigestId, List<Range>> pair) throws MDBCServiceException {
+    private void applyDigestAndUpdateDataStructures(MusicInterface mi, DBInterface dbi, DagNode node,
+            MusicTxDigestId digestId, Set<Range> ranges) throws MDBCServiceException {
+        if (alreadyReplayed(node, digestId)) {
+            return;
+        }
+
         final StagingTable txDigest;
         try {
-            txDigest = mi.getTxDigest(pair.getKey());
+            txDigest = mi.getTxDigest(digestId);
         } catch (MDBCServiceException e) {
             logger.warn("Transaction digest was not found, this could be caused by a failure of the previous owner"
                 +"And would normally only happen as the last ID of the corresponding redo log. Please check that this is the"
-                +" case for txID "+pair.getKey().transactionId.toString());
+                +" case for txID "+digestId.transactionId.toString());
             return;
         }
-        applyTxDigest(di, txDigest);
-        for (Range r : pair.getValue()) {
+        applyTxDigest(dbi, txDigest);
+        for (Range r : ranges) {
             MusicRangeInformationRow row = node.getRow();
-            alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
+            alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), digestId));
             
-            updateCheckpointLocations(mi, di, r, row.getPartitionIndex(), pair.getKey().index);
+            updateCheckpointLocations(mi, dbi, r, row.getPartitionIndex(), digestId);
         }
     }
+    
+    /**
+     * Determine if this musictxdigest id has already been replayed
+     * @param node
+     * @param redoLogIndex
+     * @return true if alreadyApplied is past this node/redolog, false if it hasn't been replayed
+     */
+    public boolean alreadyReplayed(DagNode node, MusicTxDigestId txdigest) {
+        int index = node.getRow().getRedoLog().indexOf(txdigest);
+        for (Range range: node.getRangeSet()) {
+            Pair<MriReference, MusicTxDigestId> applied = alreadyApplied.get(range);
+            if (applied==null) {
+                return false;
+            }
+            MriReference appliedMriRef = applied.getLeft();
+            MusicTxDigestId appliedDigest = applied.getRight();
+            int appliedIndex = node.getRow().getRedoLog().indexOf(appliedDigest);
+            if (appliedMriRef==null || appliedMriRef.getTimestamp() < node.getTimestamp()
+                    || (appliedMriRef.getTimestamp() == node.getTimestamp()
+                            && appliedIndex < index)) {
+                return false;
+            }
+        }
+        return true;
+    }
 
     /**
      * Update external checkpoint markers in sql db and music
@@ -263,9 +276,9 @@ public class OwnershipAndCheckpoint{
      * @param partitionIndex
      * @param index
      */
-    private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, int index) {
-        dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, index));
-        mi.updateCheckpointLocations(r, Pair.of(partitionIndex, index));
+    private void updateCheckpointLocations(MusicInterface mi, DBInterface dbi, Range r, UUID partitionIndex, MusicTxDigestId txdigest) {
+        dbi.updateCheckpointLocations(r, Pair.of(partitionIndex, txdigest.index));
+        mi.updateCheckpointLocations(r, Pair.of(partitionIndex, txdigest.index));
     }
 
     /**
@@ -279,15 +292,14 @@ public class OwnershipAndCheckpoint{
      */
     private void applyRequiredChanges(MusicInterface mi, DBInterface db, Dag extendedDag, Set<Range> ranges, UUID ownOpId)
         throws MDBCServiceException {
-        Set<Range> rangeSet = new HashSet<Range>(ranges);
         disableForeignKeys(db);
         while(!extendedDag.applied()){
             DagNode node = extendedDag.nextToApply(ranges);
             if(node!=null) {
-                Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet);
+                Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(ranges);
                 while (pair != null) {
-                    applyDigestAndUpdateDataStructures(mi, db, node, pair);
-                    pair = node.nextNotAppliedTransaction(rangeSet);
+                    applyDigestAndUpdateDataStructures(mi, db, node, pair.getLeft(), pair.getRight());
+                    pair = node.nextNotAppliedTransaction(ranges);
                     if (timeout(ownOpId)) {
                         enableForeignKeys(db);
                         throw new MDBCServiceException("Timeout apply changes to local dbi");
@@ -346,7 +358,7 @@ public class OwnershipAndCheckpoint{
         }
         Set<Range> allRanges = currentlyOwn.getAllRanges();
         //TODO: we shouldn't need to go back to music at this point
-        List<MusicRangeInformationRow> latestRows = extractRowsForRange(mi, new HashSet<>(allRanges), true);
+        List<MusicRangeInformationRow> latestRows = extractRowsForRange(mi, allRanges, true);
         currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,latestRows));
         return mi.mergeLatestRowsIfNecessary(currentlyOwn,locksForOwnership,opId);
     }
@@ -462,15 +474,6 @@ public class OwnershipAndCheckpoint{
         
     }
     
-    
-    public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException {
-        Set<Range> snapshot = partition.getSnapshot();
-        UUID row = partition.getMRIIndex();
-        for(Range r : snapshot){
-            alreadyApplied.put(r,Pair.of(new MriReference(row),-1));
-        }
-    }
-    
     // \TODO merge with dag code
     private Map<Range,Set<DagNode>> getIsLatestPerRange(Dag dag, List<MusicRangeInformationRow> rows) throws MDBCServiceException {
         Map<Range,Set<DagNode>> rowsPerLatestRange = new HashMap<>();
@@ -495,7 +498,7 @@ public class OwnershipAndCheckpoint{
     }
 
             
-    public Map<Range, Pair<MriReference, Integer>> getAlreadyApplied() {
+    public Map<Range, Pair<MriReference, MusicTxDigestId>> getAlreadyApplied() {
         return this.alreadyApplied;
     } 
     
index 8aad335..9383ac5 100755 (executable)
@@ -30,4 +30,7 @@ public final class MriReference {
 
        public long getTimestamp() { return index.timestamp();}
 
+       public String toString() {
+           return index.toString();
+       }
 }
index de711ef..8c95047 100755 (executable)
@@ -96,7 +96,7 @@ public final class MusicRangeInformationRow implements Comparable<MusicRangeInfo
         if(o == null) return false;
         if(!(o instanceof MusicRangeInformationRow)) return false;
         MusicRangeInformationRow other = (MusicRangeInformationRow) o;
-        return other.getPartitionIndex().equals(this.getPartitionIndex());
+        return other.getPartitionIndex().equals(this.getPartitionIndex()) && other.getRedoLog().equals(this.getRedoLog());
     }
 
     @Override
index db9e455..8544b47 100755 (executable)
@@ -55,4 +55,8 @@ public final class MusicTxDigestId {
     public int hashCode(){
         return transactionId.hashCode();
     }
+    
+    public String toString() {
+        return this.transactionId.toString();
+    }
 }
index ee50dca..afe378e 100644 (file)
@@ -170,7 +170,7 @@ public class DagTest {
         HashSet<Range> rangesSet = new HashSet<>(ranges);
         while(!dag.applied()){
             DagNode node = dag.nextToApply(ranges);
-            Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangesSet);
+            Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(rangesSet);
             int transactionCounter = 0;
             while(pair!=null) {
                 assertNotEquals(1,transactionCounter);
@@ -178,9 +178,10 @@ public class DagTest {
                 MusicTxDigestId id = row.getRedoLog().get(transactionCounter);
                 assertEquals(id,pair.getKey());
                 assertEquals(0,pair.getKey().index);
-                List<Range> value = pair.getValue();
+                Set<Range> value = pair.getValue();
                 assertEquals(1,value.size());
-                assertEquals(new Range("schema.range1"),value.get(0));
+                assertTrue(value.contains(new Range("schema.range1")));
+                //assertEquals(new Range("schema.range1"),value.get(0));
                 pair = node.nextNotAppliedTransaction(rangesSet);
                 transactionCounter++;
             }
@@ -192,7 +193,7 @@ public class DagTest {
 
     @Test
     public void nextToApply2() throws InterruptedException, MDBCServiceException {
-        Map<Range, Pair<MriReference, Integer>> alreadyApplied = new HashMap<>();
+        Map<Range, Pair<MriReference, MusicTxDigestId>> alreadyApplied = new HashMap<>();
         List<MusicRangeInformationRow> rows = new ArrayList<>();
         Set<Range> ranges = new HashSet<>( Arrays.asList(
             new Range("schema.range1")
@@ -207,7 +208,7 @@ public class DagTest {
             new MusicTxDigestId(null,MDBCUtils.generateUniqueKey(),1)
         ));
         MusicRangeInformationRow newRow = createNewRow(new HashSet<>(ranges), "", false, redo2);
-        alreadyApplied.put(new Range("schema.range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0));
+        alreadyApplied.put(new Range("schema.range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), redo1.get(0)));
         rows.add(newRow);
         MILLISECONDS.sleep(10);
         List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList(
@@ -220,7 +221,7 @@ public class DagTest {
         int nodeCounter = 1;
         while(!dag.applied()){
             DagNode node = dag.nextToApply(ranges);
-            Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangesSet);
+            Pair<MusicTxDigestId, Set<Range>> pair = node.nextNotAppliedTransaction(rangesSet);
             int transactionCounter = 0;
             while(pair!=null) {
                 assertNotEquals(1,transactionCounter);
@@ -228,9 +229,10 @@ public class DagTest {
                 MusicTxDigestId id = row.getRedoLog().get(2-nodeCounter);
                 assertEquals(id,pair.getKey());
                 assertEquals(2-nodeCounter,pair.getKey().index);
-                List<Range> value = pair.getValue();
+                Set<Range> value = pair.getValue();
                 assertEquals(1,value.size());
-                assertEquals(new Range("schema.range1"),value.get(0));
+                assertTrue(value.contains(new Range("schema.range1")));
+                //assertEquals(new Range("schema.range1"),value.get(0));
                 pair = node.nextNotAppliedTransaction(rangesSet);
                 transactionCounter++;
             }
index 2443d1e..1c9eb11 100644 (file)
@@ -151,7 +151,6 @@ public class OwnershipAndCheckpointTest {
         String sqlOperation = "INSERT INTO "+TABLE+" (PersonID,LastName,FirstName,Address,City) VALUES "+
             "(1,'SAUREZ','ENRIQUE','GATECH','ATLANTA');";
         StagingTable stagingTable = new StagingTable();
-        ownAndCheck.reloadAlreadyApplied(partition);
         final Statement executeStatement = this.conn.createStatement();
         executeStatement.execute(sqlOperation);
         this.conn.commit();
@@ -224,7 +223,7 @@ public class OwnershipAndCheckpointTest {
             locks.put(own.getDag().getNode(own.getRangeId()).getRow(),
                 new LockResult(own.getRangeId(), own.getOwnerId(), true,
                     ranges));
-            ownAndCheck.checkpoint(musicMixin, mysqlMixin, own.getDag(), ranges, locks, ownOpId);
+            ownAndCheck.checkpoint(musicMixin, mysqlMixin, own.getDag(), ranges, ownOpId);
         }
 
         checkData();