Keyspace creation, and handling of mdbc_cuid 32/77532/1
authorEnrique Saurez <esaurez@gatech.edu>
Sun, 27 Jan 2019 03:51:56 +0000 (22:51 -0500)
committerEnrique Saurez <enrique.saurez@gmail.com>
Tue, 29 Jan 2019 19:20:49 +0000 (14:20 -0500)
Issue-ID: MUSIC-281
Signed-off-by: Enrique Saurez<enrique.saurez@gmail.com>
Change-Id: I3bc685e30e064c1c93386301385115632b179449

15 files changed:
mdbc-benchmark/src/main/java/org/onap/music/mdbc/Benchmark.java
mdbc-server/src/main/java/org/onap/music/mdbc/DatabasePartition.java
mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
mdbc-server/src/main/java/org/onap/music/mdbc/configurations/TablesConfiguration.java
mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/ownership/Dag.java
mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
mdbc-server/src/main/java/org/onap/music/mdbc/tables/MriReference.java
mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
mdbc-server/src/main/resources/music.properties
mdbc-server/src/test/java/org/onap/music/mdbc/ownership/DagTest.java

index 7baeaa2..a87f8da 100644 (file)
@@ -346,7 +346,9 @@ public class Benchmark
         //iterations
         Map<Integer, List<Long>> results = new HashMap<>();
         final int totalIterations = 20;
-        final int[] rows = { 1,10,100, 500, 1000};
+        final int[] rows = { 1,10,50,80//};
+                           ,100,200,300,400};//, 500};
+                         //, 1000};
         for(int row: rows) {
             System.out.println("Running for rows: "+Integer.toString(row));
             results.put(row,new ArrayList<Long>());
index ff8eb80..4204960 100755 (executable)
@@ -73,7 +73,7 @@ public class DatabasePartition {
      * This function is used to change the contents of this, with the contents of a different object
      * @param otherPartition partition that is used to substitute the local contents
      */
-    public void updateDatabasePartition(DatabasePartition otherPartition){
+    public synchronized void updateDatabasePartition(DatabasePartition otherPartition){
         musicRangeInformationIndex = otherPartition.musicRangeInformationIndex;//Index that can be obtained either from
         lockId = otherPartition.lockId;
         ranges = otherPartition.ranges;
@@ -90,21 +90,21 @@ public class DatabasePartition {
     }
 
 
-    public boolean isLocked(){return lockId != null && !lockId.isEmpty(); }
+    public synchronized boolean isLocked(){return lockId != null && !lockId.isEmpty(); }
 
-    public boolean isReady() {
+    public synchronized boolean isReady() {
         return ready;
     }
 
-    public void setReady(boolean ready) {
+    public synchronized void setReady(boolean ready) {
         this.ready = ready;
     }
 
-    public UUID getMRIIndex() {
+    public synchronized UUID getMRIIndex() {
         return musicRangeInformationIndex;
     }
 
-    public void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) {
+    public synchronized void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) {
         this.musicRangeInformationIndex = musicRangeInformationIndex;
     }
 
@@ -179,15 +179,15 @@ public class DatabasePartition {
         return range;
     }
 
-    public String getLockId() {
+    public synchronized String getLockId() {
         return lockId;
     }
 
-    public void setLockId(String lockId) {
+    public synchronized void setLockId(String lockId) {
         this.lockId = lockId;
     }
 
-    public boolean isContained(Range range){
+    public synchronized boolean isContained(Range range){
         for(Range r: ranges){
             if(r.overlaps(range)){
                 return true;
index 69a678b..d336eef 100755 (executable)
@@ -504,6 +504,7 @@ public class MdbcConnection implements Connection {
         DatabasePartition tempPartition = own(scQueryTables);
         if(tempPartition!=null && tempPartition != partition) {
             this.partition.updateDatabasePartition(tempPartition);
+            mi.reloadAlreadyApplied(this.partition);
         }
       dbi.preStatementHook(sql);
     }
index 1f722f1..b403dd2 100755 (executable)
@@ -45,6 +45,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * \TODO Implement an interface for the server logic and a factory 
@@ -79,7 +82,10 @@ public class StateManager {
     /** Identifier for this server instance */
     private String mdbcServerName;
     private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition
+    private final Lock eventualLock  = new ReentrantLock();
     private List<Range> eventualRanges;
+    private final Lock warmupLock = new ReentrantLock();
+    private List<Range> warmupRanges;
 
        public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
         this.sqlDBName = sqlDBName;
@@ -87,7 +93,7 @@ public class StateManager {
         this.info = info;
         this.mdbcServerName = mdbcServerName;
     
-        this.connectionRanges = new HashMap<>();
+        this.connectionRanges = new ConcurrentHashMap<>();
         this.transactionInfo = new TxCommitProgress();
         //\fixme this might not be used, delete?
         try {
@@ -145,17 +151,52 @@ public class StateManager {
        return this.musicInterface;
     }
     
-    public List<DatabasePartition> getRanges() {
+    public List<DatabasePartition> getPartitions() {
         return new ArrayList<>(connectionRanges.values());
        }
 
+       public List<Range> getWarmupRanges(){
+        warmupLock.lock();
+        List<Range> returnArray;
+        try {
+            if(warmupRanges!=null) {
+                returnArray = new ArrayList<>(warmupRanges);
+            }
+            else{
+                returnArray = null;
+            }
+        }
+        finally{
+           warmupLock.unlock();
+        }
+        return returnArray;
+    }
 
     public List<Range> getEventualRanges() {
-        return eventualRanges;
+        eventualLock.lock();
+        List<Range> returnArray;
+        try {
+            if(eventualRanges!=null){
+                returnArray = new ArrayList<>(eventualRanges);
+            }
+            else{
+                returnArray= null;
+            }
+        }
+        finally{
+            eventualLock.unlock();
+        }
+        return returnArray;
     }
 
     public void setEventualRanges(List<Range> eventualRanges) {
-        this.eventualRanges = eventualRanges;
+        eventualLock.lock();
+        try {
+            this.eventualRanges = eventualRanges;
+        }
+        finally{
+            eventualLock.unlock();
+        }
     }
 
     public void closeConnection(String connectionId){
@@ -267,4 +308,14 @@ public class StateManager {
         }
 
     }
+
+    public void setWarmupRanges(List<Range> warmupRanges) {
+        warmupLock.lock();
+        try {
+            this.warmupRanges = warmupRanges;
+        }
+        finally{
+            warmupLock.unlock();
+        }
+    }
 }
index 8497911..a9d179f 100755 (executable)
@@ -51,6 +51,7 @@ public class TablesConfiguration {
     private String internalNamespace;
     private int internalReplicationFactor;
     private String musicNamespace;
+    private int musicReplicationFactor;
     private String tableToPartitionName;
     private String partitionInformationTableName;
     private String redoHistoryTableName;
@@ -66,6 +67,7 @@ public class TablesConfiguration {
      */
     public List<NodeConfiguration> initializeAndCreateNodeConfigurations() throws MDBCServiceException {
         logger.info("initializing the required spaces");
+        createKeyspaces();
         initInternalNamespace();
 
         List<NodeConfiguration> nodeConfigs = new ArrayList<>();
@@ -81,10 +83,6 @@ public class TablesConfiguration {
 
             String partitionId;
             if(partitionInfo.partitionId==null || partitionInfo.partitionId.isEmpty()){
-                if(partitionInfo.replicationFactor==0){
-                    logger.error("Replication factor and partition id are both empty, and this is an invalid configuration" );
-                    throw new MDBCServiceException("Replication factor and partition id are both empty, and this is an invalid configuration");
-                }
                 //1) Create a row in the partition info table
                 partitionId = MDBCUtils.generateTimebasedUniqueKey().toString();
             }
@@ -110,6 +108,12 @@ public class TablesConfiguration {
         return nodeConfigs;
     }
 
+    private void createKeyspaces() throws MDBCServiceException {
+        MusicMixin.createKeyspace(internalNamespace,internalReplicationFactor);
+        MusicMixin.createKeyspace(musicNamespace,musicReplicationFactor);
+
+    }
+
     private void checkIfMriIsEmpty(String mriTableName) throws MDBCServiceException {
         //First check if table exists
         StringBuilder checkTableExistsString = new StringBuilder("SELECT table_name FROM system_schema.tables WHERE keyspace_name='")
@@ -172,7 +176,6 @@ public class TablesConfiguration {
         private String mriTableName;
         private String mtxdTableName;
         private String partitionId;
-        private int replicationFactor;
 
         public List<Range> getTables() {
             return tables;
@@ -206,14 +209,6 @@ public class TablesConfiguration {
             this.partitionId = partitionId;
         }
 
-        public int getReplicationFactor() {
-            return replicationFactor;
-        }
-
-        public void setReplicationFactor(int replicationFactor) {
-            this.replicationFactor = replicationFactor;
-        }
-
         public String getMtxdTableName(){
            return mtxdTableName;
         }
index 2e4e0ee..8cbbfec 100755 (executable)
@@ -9,15 +9,15 @@
       "owner": "",
       "mriTableName": "musicrangeinformation",
       "mtxdTableName": "musictxdigest",
-      "partitionId": "",
-      "replicationFactor": 1
+      "partitionId": ""
     }
   ],
+  "internalNamespace": "music_internal",
+  "internalReplicationFactor": 1,
   "musicNamespace": "namespace",
+  "musicReplicationFactor": 1,
   "tableToPartitionName": "tabletopartition",
   "partitionInformationTableName": "partitioninfo",
   "redoHistoryTableName": "redohistory",
-  "sqlDatabaseName": "test",
-  "internalNamespace": "music_internal",
-  "internalReplicationFactor": 1
+  "sqlDatabaseName": "test"
 }
index 35b6121..49d4c71 100755 (executable)
@@ -321,7 +321,8 @@ public interface MusicInterface {
 
     OwnershipAndCheckpoint getOwnAndCheck();
     
-    
     ArrayList<HashMap<Range, StagingTable>> getEveTxDigest() throws MDBCServiceException;
+
+    void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException;
 }
 
index e8028c1..999c67f 100755 (executable)
@@ -36,6 +36,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiFunction;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang3.tuple.Pair;
@@ -57,11 +58,7 @@ import org.onap.music.mdbc.TableInfo;
 import org.onap.music.mdbc.ownership.Dag;
 import org.onap.music.mdbc.ownership.DagNode;
 import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
-import org.onap.music.mdbc.tables.MusicRangeInformationRow;
-import org.onap.music.mdbc.tables.MusicTxDigestId;
-import org.onap.music.mdbc.tables.RangeDependency;
-import org.onap.music.mdbc.tables.StagingTable;
-import org.onap.music.mdbc.tables.TxCommitProgress;
+import org.onap.music.mdbc.tables.*;
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.ColumnDefinitions;
 import com.datastax.driver.core.DataType;
@@ -208,7 +205,7 @@ public class MusicMixin implements MusicInterface {
     private boolean keyspace_created   = false;
     private Map<String, PreparedStatement> ps_cache = new HashMap<>();
     private Set<String> in_progress    = Collections.synchronizedSet(new HashSet<String>());
-    private Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied;
+    private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
     private OwnershipAndCheckpoint ownAndCheck;
 
     public MusicMixin() {
@@ -243,7 +240,7 @@ public class MusicMixin implements MusicInterface {
         String t = info.getProperty(KEY_TIMEOUT);
         this.timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t);
 
-        alreadyApplied = new HashMap<>();
+        alreadyApplied = new ConcurrentHashMap<>();
         ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied,timeout);
 
         initializeMetricTables();
@@ -263,21 +260,25 @@ public class MusicMixin implements MusicInterface {
      */
     @Override
     public void createKeyspace() throws MDBCServiceException {
+        createKeyspace(this.music_ns,this.music_rfactor);
+    }
 
+    public static void createKeyspace(String keyspace, int replicationFactor) throws MDBCServiceException {
         Map<String,Object> replicationInfo = new HashMap<>();
         replicationInfo.put("'class'", "'SimpleStrategy'");
-        replicationInfo.put("'replication_factor'", music_rfactor);
+        replicationInfo.put("'replication_factor'", replicationFactor);
 
         PreparedQueryObject queryObject = new PreparedQueryObject();
         queryObject.appendQueryString(
-            "CREATE KEYSPACE IF NOT EXISTS " + this.music_ns +
+            "CREATE KEYSPACE IF NOT EXISTS " + keyspace +
                 " WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":"));
 
         try {
             MusicCore.nonKeyRelatedPut(queryObject, "eventual");
         } catch (MusicServiceException e) {
-            if (!e.getMessage().equals("Keyspace "+music_ns+" already exists")) {
-                throw new MDBCServiceException("Error creating namespace: "+music_ns+". Internal error:"+e.getErrorMessage(), e);
+            if (!e.getMessage().equals("Keyspace "+keyspace+" already exists")) {
+                throw new MDBCServiceException("Error creating namespace: "+keyspace+". Internal error:"+e.getErrorMessage(),
+                    e);
             }
         }
     }
@@ -1335,6 +1336,7 @@ public class MusicMixin implements MusicInterface {
         //0. See if reference to lock was already created
         String lockId = partition.getLockId();
         if(mriIndex==null || lockId == null || lockId.isEmpty()) {
+            //\TODO fix this
             own(partition.getSnapshot(),partition, MDBCUtils.generateTimebasedUniqueKey());
         }
 
@@ -1361,7 +1363,7 @@ public class MusicMixin implements MusicInterface {
             } catch (IOException e) {
                 throw new MDBCServiceException("Failed to serialized transaction digest with error " + e.toString(), e);
             }
-            MusicTxDigestId digestId = new MusicTxDigestId(commitId, -1);
+            MusicTxDigestId digestId = new MusicTxDigestId(mriIndex, -1);
             addTxDigest(digestId, serializedTransactionDigest);
             //2. Save RRT index to RQ
             if (progressKeeper != null) {
@@ -1369,6 +1371,20 @@ public class MusicMixin implements MusicInterface {
             }
             //3. Append RRT index into the corresponding TIT row array
             appendToRedoLog(partition, digestId);
+            List<Range> ranges = partition.getSnapshot();
+            for(Range r : ranges) {
+                if(!alreadyApplied.containsKey(r)){
+                    throw new MDBCServiceException("already applied data structure was not updated correctly and range "
+                    +r+" is not contained");
+                }
+                Pair<MriReference, Integer> rowAndIndex = alreadyApplied.get(r);
+                MriReference key = rowAndIndex.getKey();
+                if(!mriIndex.equals(key.index)){
+                    throw new MDBCServiceException("already applied data structure was not updated correctly and range "+
+                        r+" is not pointing to row: "+mriIndex.toString());
+                }
+                alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), rowAndIndex.getValue()+1));
+            }
         }
     }
 
@@ -1936,8 +1952,17 @@ public class MusicMixin implements MusicInterface {
         return ecDigestList;
     }
 
+    @Override
+    public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException {
+        List<Range> snapshot = partition.getSnapshot();
+        UUID row = partition.getMRIIndex();
+        for(Range r : snapshot){
+            alreadyApplied.put(r,Pair.of(new MriReference(row),-1));
+        }
+
+    }
+
 
-    
     ResultSet getAllMriCassandraRows() throws MDBCServiceException {
         StringBuilder cqlOperation = new StringBuilder();
         cqlOperation.append("SELECT * FROM ")
index 00abe85..64f4e0c 100755 (executable)
@@ -949,9 +949,12 @@ NEW.field refers to the new value
        StringBuilder keyCondStmt = new StringBuilder();
        String and = "";
        for (String key: primaryKeys.keySet()) {
-               Object val = primaryKeys.get(key);
-               keyCondStmt.append(and + key + "=\"" + val + "\"");
-               and = " AND ";
+           // We cannot use the default primary key for the sql table and operations
+           if(!key.equals(mi.getMusicDefaultPrimaryKeyName())) {
+                Object val = primaryKeys.get(key);
+                keyCondStmt.append(and + key + "=\"" + val + "\"");
+                and = " AND ";
+            }
        }
                return keyCondStmt.toString();
        }
index 68d1f19..02c5d7b 100644 (file)
@@ -29,6 +29,7 @@ import org.onap.music.exceptions.MDBCServiceException;
 import org.onap.music.logging.EELFLoggerDelegate;
 import org.onap.music.mdbc.DatabasePartition;
 import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.tables.MriReference;
 import org.onap.music.mdbc.tables.MriRowComparator;
 import org.onap.music.mdbc.tables.MusicRangeInformationRow;
 
@@ -231,15 +232,15 @@ public class Dag {
         return toApplyNodes.isEmpty();
     }
 
-    public void setAlreadyApplied(Map<Range, Pair<MusicRangeInformationRow,Integer>> alreadyApplied, Set<Range> ranges)
+    public void setAlreadyApplied(Map<Range, Pair<MriReference,Integer>> alreadyApplied, Set<Range> ranges)
         throws MDBCServiceException {
         for(Map.Entry<UUID,DagNode> node : nodes.entrySet()){
             Set<Range> intersection = new HashSet<>(ranges);
             intersection.retainAll(node.getValue().getRangeSet());
             for(Range r : intersection){
                 if(alreadyApplied.containsKey(r)){
-                    final Pair<MusicRangeInformationRow, Integer> appliedPair = alreadyApplied.get(r);
-                    final MusicRangeInformationRow appliedRow = appliedPair.getKey();
+                    final Pair<MriReference, Integer> appliedPair = alreadyApplied.get(r);
+                    final MriReference appliedRow = appliedPair.getKey();
                     final int index = appliedPair.getValue();
                     final long appliedTimestamp = appliedRow.getTimestamp();
                     final long nodeTimestamp = node.getValue().getTimestamp();
index 4ccd21d..8ec1793 100644 (file)
@@ -33,6 +33,7 @@ import org.onap.music.mdbc.Range;
 import org.onap.music.mdbc.mixins.DBInterface;
 import org.onap.music.mdbc.mixins.LockResult;
 import org.onap.music.mdbc.mixins.MusicInterface;
+import org.onap.music.mdbc.tables.MriReference;
 import org.onap.music.mdbc.tables.MusicRangeInformationRow;
 import org.onap.music.mdbc.tables.MusicTxDigestId;
 import org.onap.music.mdbc.tables.StagingTable;
@@ -42,7 +43,7 @@ public class OwnershipAndCheckpoint{
     private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(OwnershipAndCheckpoint.class);
     private Lock checkpointLock;
     private AtomicBoolean change;
-    private Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied;
+    private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
     private Map<UUID,Long> ownershipBeginTime;
     private long timeoutInMs;
 
@@ -50,7 +51,7 @@ public class OwnershipAndCheckpoint{
       this(new HashMap<>(),Long.MAX_VALUE);
     }
 
-    public OwnershipAndCheckpoint(Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied, long timeoutInMs){
+    public OwnershipAndCheckpoint(Map<Range, Pair<MriReference, Integer>> alreadyApplied, long timeoutInMs){
         change = new AtomicBoolean(true);
         checkpointLock = new ReentrantLock();
         this.alreadyApplied = alreadyApplied;
@@ -114,6 +115,9 @@ public class OwnershipAndCheckpoint{
 
     public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, List<Range> ranges,
         Map<MusicRangeInformationRow, LockResult> locks, UUID ownOpId) throws MDBCServiceException {
+        if(ranges.isEmpty()){
+            return;
+        }
         try {
             checkpointLock.lock();
             change.set(true);
@@ -156,6 +160,9 @@ public class OwnershipAndCheckpoint{
     }
 
     public void warmup(MusicInterface mi, DBInterface di, List<Range> ranges) throws MDBCServiceException {
+        if(ranges.isEmpty()){
+            return;
+        }
         boolean ready = false;
         change.set(true);
         Set<Range> rangeSet = new HashSet<Range>(ranges);
@@ -181,7 +188,8 @@ public class OwnershipAndCheckpoint{
                             final HashMap<Range, StagingTable> txDigest = mi.getTxDigest(pair.getKey());
                             applyTxDigest(di, txDigest);
                             for (Range r : pair.getValue()) {
-                                alreadyApplied.put(r, Pair.of(node.getRow(), pair.getKey().index));
+                                MusicRangeInformationRow row = node.getRow();
+                                alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
                             }
                         }
                         pair = node.nextNotAppliedTransaction(rangeSet);
@@ -208,7 +216,8 @@ public class OwnershipAndCheckpoint{
                     final HashMap<Range, StagingTable> txDigest = mi.getTxDigest(pair.getKey());
                     applyTxDigest(db, txDigest);
                     for (Range r : pair.getValue()) {
-                        alreadyApplied.put(r, Pair.of(node.getRow(), pair.getKey().index));
+                        MusicRangeInformationRow row = node.getRow();
+                        alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
                     }
                     pair = node.nextNotAppliedTransaction(rangeSet);
                     if (timeout(ownOpId)) {
index 69f2c31..8aad335 100755 (executable)
@@ -28,4 +28,6 @@ public final class MriReference {
                this.index=  index;
        }
 
+       public long getTimestamp() { return index.timestamp();}
+
 }
index 1da2d79..3b6953c 100644 (file)
@@ -21,10 +21,7 @@ package org.onap.music.mdbc.tables;
 
 import java.sql.SQLException;
 import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 import org.onap.music.exceptions.MDBCServiceException;
 import org.onap.music.logging.EELFLoggerDelegate;
@@ -70,23 +67,26 @@ public class MusicTxDigest {
                            continue;
                        }
                        //2) for each partition I don't own
-                       List<DatabasePartition> ranges = stateManager.getRanges();
-                       if(ranges.size()!=0) {
-                               DatabasePartition myPartition = ranges.get(0);
-                               for (UUID partition : partitions) {
-                                       if (!partition.equals(myPartition.getMRIIndex())) {
-                                               try {
-                                                       //replayDigestForPartition(mi, partition, dbi);
-                                                   mi.getOwnAndCheck().warmup(mi, dbi, myPartition.getSnapshot());
-                                               } catch (MDBCServiceException e) {
-                                                       logger.error("Unable to update for partition : " + partition + ". " + e.getMessage());
-                                                       continue;
-                                               }
-                                       }
-                               }
-                       }
-               
-                       //Step 3: ReplayDigest() for E.C conditions
+            final List<Range> warmuplist = stateManager.getWarmupRanges();
+                       if(warmuplist!=null) {
+                final Set<Range> warmupRanges = new HashSet(warmuplist);
+                final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
+                List<Range> missingRanges = new ArrayList<>();
+                if (currentPartitions.size() != 0) {
+                    for (DatabasePartition part : currentPartitions) {
+                        List<Range> partitionRanges = part.getSnapshot();
+                        warmupRanges.removeAll(partitionRanges);
+                    }
+                    try {
+                        mi.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
+                    } catch (MDBCServiceException e) {
+                        logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
+                        continue;
+                    }
+                }
+            }
+
+    //Step 3: ReplayDigest() for E.C conditions
                        try {
                                replayDigest(mi,dbi);
                        } catch (MDBCServiceException e) {
index a676f70..21f3e92 100755 (executable)
@@ -1,5 +1,5 @@
 cassandra.host =\
-  143.215.128.49
+  192.168.1.19
 cassandra.user =\
   metric
 cassandra.password =\
index 85e31cd..da64595 100644 (file)
@@ -34,6 +34,7 @@ import org.onap.music.exceptions.MDBCServiceException;
 import org.onap.music.mdbc.DatabasePartition;
 import org.onap.music.mdbc.MDBCUtils;
 import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.tables.MriReference;
 import org.onap.music.mdbc.tables.MusicRangeInformationRow;
 import org.onap.music.mdbc.tables.MusicTxDigestId;
 
@@ -191,7 +192,7 @@ public class DagTest {
 
     @Test
     public void nextToApply2() throws InterruptedException, MDBCServiceException {
-        Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied = new HashMap<>();
+        Map<Range, Pair<MriReference, Integer>> alreadyApplied = new HashMap<>();
         List<MusicRangeInformationRow> rows = new ArrayList<>();
         List<Range> ranges = new ArrayList<>( Arrays.asList(
             new Range("range1")
@@ -206,7 +207,7 @@ public class DagTest {
             new MusicTxDigestId(MDBCUtils.generateUniqueKey(),1)
         ));
         MusicRangeInformationRow newRow = createNewRow(new ArrayList<>(ranges), "", false, redo2);
-        alreadyApplied.put(new Range("range1"),Pair.of(newRow, 0));
+        alreadyApplied.put(new Range("range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0));
         rows.add(newRow);
         MILLISECONDS.sleep(10);
         List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList(