Optimize eventual reads 58/84358/1
authorstatta <statta@research.att.com>
Fri, 5 Apr 2019 14:54:06 +0000 (10:54 -0400)
committerstatta <statta@research.att.com>
Fri, 5 Apr 2019 14:54:29 +0000 (10:54 -0400)
Issue-ID: MUSIC-371
Change-Id: Ica2a27a16dd82e5c99cb5775d39c6526ed086187
Signed-off-by: statta <statta@research.att.com>
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/tables/StagingTable.java
mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java

index c176ad9..39b1e21 100644 (file)
@@ -156,7 +156,7 @@ public class MusicMixin implements MusicInterface {
     static {
         // We only support the following type mappings currently (from DB -> Cassandra).
         // Anything else will likely cause a NullPointerException
-        typemap.put(Types.BIGINT,    "BIGINT");        // aka. IDENTITY
+        typemap.put(Types.BIGINT,    "BIGINT"); // aka. IDENTITY
         typemap.put(Types.BLOB,      "VARCHAR");
         typemap.put(Types.BOOLEAN,   "BOOLEAN");
         typemap.put(Types.CLOB,      "BLOB");
@@ -169,10 +169,10 @@ public class MusicMixin implements MusicInterface {
         typemap.put(Types.TIMESTAMP, "VARCHAR");
         typemap.put(Types.VARBINARY, "BLOB");
         typemap.put(Types.VARCHAR,   "VARCHAR");
-        typemap.put(Types.CHAR,         "VARCHAR");
+        typemap.put(Types.CHAR,      "VARCHAR");
         //The "Hacks", these don't have a direct mapping
-        //typemap.put(Types.DATE,       "VARCHAR");
-        //typemap.put(Types.DATE,       "TIMESTAMP");
+        //typemap.put(Types.DATE,        "VARCHAR");
+        //typemap.put(Types.DATE,        "TIMESTAMP");
     }
 
 
@@ -389,12 +389,12 @@ public class MusicMixin implements MusicInterface {
     @Override
     public void createDirtyRowTable(TableInfo ti, String tableName) {
         // create dirtybitsTable at all replicas
-//             for (String repl : allReplicaIds) {
-////                   String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i];
-////                   String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);";
-//                     cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl);
-//                     executeMusicWriteQuery(cql);
-//             }
+//      for (String repl : allReplicaIds) {
+////            String dirtyRowsTableName = "dirty_"+tableName+"_"+allReplicaIds[i];
+////            String dirtyTableQuery = "CREATE TABLE IF NOT EXISTS "+music_ns+"."+ dirtyRowsTableName+" (dirtyRowKeys text PRIMARY KEY);";
+//          cql = String.format("CREATE TABLE IF NOT EXISTS %s.DIRTY_%s_%s (dirtyRowKeys TEXT PRIMARY KEY);", music_ns, tableName, repl);
+//          executeMusicWriteQuery(cql);
+//      }
         StringBuilder ddl = new StringBuilder("REPLICA__ TEXT");
         StringBuilder cols = new StringBuilder("REPLICA__");
         for (int i = 0; i < ti.columns.size(); i++) {
@@ -451,8 +451,8 @@ public class MusicMixin implements MusicInterface {
             System.err.println("markDIrtyRow need to fix primary key");
         }
         String cql = String.format("INSERT INTO %s.DIRTY_%s (%s) VALUES (%s);", music_ns, tableName, cols.toString(), vals.toString());
-               /*Session sess = getMusicSession();
-               PreparedStatement ps = getPreparedStatementFromCache(cql);*/
+        /*Session sess = getMusicSession();
+        PreparedStatement ps = getPreparedStatementFromCache(cql);*/
         String primaryKey;
         if(ti.hasKey()) {
             primaryKey = getMusicKeyFromRow(ti,tableName, keys);
@@ -476,13 +476,13 @@ public class MusicMixin implements MusicInterface {
             pQueryObject.addValue(pkObj);
             updateMusicDB(tableName, primaryKey, pQueryObject);
             //if (!repl.equals(myId)) {
-                               /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
-                               vallist.set(0, repl);
-                               BoundStatement bound = ps.bind(vallist.toArray());
-                               bound.setReadTimeoutMillis(60000);
-                               synchronized (sess) {
-                                       sess.execute(bound);
-                               }*/
+                /*logger.info(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
+                vallist.set(0, repl);
+                BoundStatement bound = ps.bind(vallist.toArray());
+                bound.setReadTimeoutMillis(60000);
+                synchronized (sess) {
+                    sess.execute(bound);
+                }*/
             //}
 
         }
@@ -514,13 +514,13 @@ public class MusicMixin implements MusicInterface {
         if(rt.getResult().getResult().toLowerCase().equals("failure")) {
             System.out.println("Failure while cleanDirtyRow..."+rt.getMessage());
         }
-               /*Session sess = getMusicSession();
-               PreparedStatement ps = getPreparedStatementFromCache(cql);
-               BoundStatement bound = ps.bind(vallist.toArray());
-               bound.setReadTimeoutMillis(60000);
-               synchronized (sess) {
-                       sess.execute(bound);
-               }*/
+        /*Session sess = getMusicSession();
+        PreparedStatement ps = getPreparedStatementFromCache(cql);
+        BoundStatement bound = ps.bind(vallist.toArray());
+        bound.setReadTimeoutMillis(60000);
+        synchronized (sess) {
+            sess.execute(bound);
+        }*/
     }
     /**
      * Get a list of "dirty rows" for a table.  The dirty rows returned apply only to this replica,
@@ -533,14 +533,14 @@ public class MusicMixin implements MusicInterface {
         String cql = String.format("SELECT * FROM %s.DIRTY_%s WHERE REPLICA__=?;", music_ns, tableName);
         ResultSet results = null;
         logger.debug(EELFLoggerDelegate.applicationLogger,"Executing MUSIC write:"+ cql);
-               
-               /*Session sess = getMusicSession();
-               PreparedStatement ps = getPreparedStatementFromCache(cql);
-               BoundStatement bound = ps.bind(new Object[] { myId });
-               bound.setReadTimeoutMillis(60000);
-               synchronized (sess) {
-                       results = sess.execute(bound);
-               }*/
+        
+        /*Session sess = getMusicSession();
+        PreparedStatement ps = getPreparedStatementFromCache(cql);
+        BoundStatement bound = ps.bind(new Object[] { myId });
+        bound.setReadTimeoutMillis(60000);
+        synchronized (sess) {
+            results = sess.execute(bound);
+        }*/
         PreparedQueryObject pQueryObject = new PreparedQueryObject();
         pQueryObject.appendQueryString(cql);
         try {
@@ -646,14 +646,14 @@ public class MusicMixin implements MusicInterface {
         String cql = String.format("DELETE FROM %s.%s WHERE %s;", music_ns, tableName, where.toString());
         logger.error(EELFLoggerDelegate.errorLogger,"Executing MUSIC write:"+ cql);
         pQueryObject.appendQueryString(cql);
-               
-               /*PreparedStatement ps = getPreparedStatementFromCache(cql);
-               BoundStatement bound = ps.bind(vallist.toArray());
-               bound.setReadTimeoutMillis(60000);
-               Session sess = getMusicSession();
-               synchronized (sess) {
-                       sess.execute(bound);
-               }*/
+        
+        /*PreparedStatement ps = getPreparedStatementFromCache(cql);
+        BoundStatement bound = ps.bind(vallist.toArray());
+        bound.setReadTimeoutMillis(60000);
+        Session sess = getMusicSession();
+        synchronized (sess) {
+            sess.execute(bound);
+        }*/
         String primaryKey = getMusicKeyFromRow(ti,tableName, oldRow);
 
         updateMusicDB(tableName, primaryKey, pQueryObject);
@@ -716,15 +716,15 @@ public class MusicMixin implements MusicInterface {
 
                 e.printStackTrace();
             }
-                       /*
-                       Session sess = getMusicSession();
-                       PreparedStatement ps = getPreparedStatementFromCache(cql);
-                       BoundStatement bound = ps.bind(vallist.toArray());
-                       bound.setReadTimeoutMillis(60000);
-                       ResultSet dirtyRows = null;
-                       synchronized (sess) {
-                               dirtyRows = sess.execute(bound);
-                       }*/
+            /*
+            Session sess = getMusicSession();
+            PreparedStatement ps = getPreparedStatementFromCache(cql);
+            BoundStatement bound = ps.bind(vallist.toArray());
+            bound.setReadTimeoutMillis(60000);
+            ResultSet dirtyRows = null;
+            synchronized (sess) {
+                dirtyRows = sess.execute(bound);
+            }*/
             List<Row> rows = dirtyRows.all();
             if (rows.isEmpty()) {
                 // No rows, the row must have been deleted
@@ -771,48 +771,48 @@ public class MusicMixin implements MusicInterface {
         }
 
         logger.debug("Blocking rowid: "+rowid);
-        in_progress.add(rowid);                        // Block propagation of the following INSERT/UPDATE
+        in_progress.add(rowid);         // Block propagation of the following INSERT/UPDATE
 
         dbi.insertRowIntoSqlDb(tableName, map);
 
         logger.debug("Unblocking rowid: "+rowid);
-        in_progress.remove(rowid);             // Unblock propagation
-
-//             try {
-//                     String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString());
-//                     executeSQLWrite(sql);
-//             } catch (SQLException e) {
-//                     logger.debug("Insert failed because row exists, do an update");
-//                     // TODO - rewrite this UPDATE command should not update key fields
-//                     String sql = String.format("UPDATE %s SET (%s) = (%s) WHERE %s", tableName, fields.toString(), values.toString(), where.toString());
-//                     try {
-//                             executeSQLWrite(sql);
-//                     } catch (SQLException e1) {
-//                             e1.printStackTrace();
-//                     }
-//             }
+        in_progress.remove(rowid);      // Unblock propagation
+
+//      try {
+//          String sql = String.format("INSERT INTO %s (%s) VALUES (%s);", tableName, fields.toString(), values.toString());
+//          executeSQLWrite(sql);
+//      } catch (SQLException e) {
+//          logger.debug("Insert failed because row exists, do an update");
+//          // TODO - rewrite this UPDATE command should not update key fields
+//          String sql = String.format("UPDATE %s SET (%s) = (%s) WHERE %s", tableName, fields.toString(), values.toString(), where.toString());
+//          try {
+//              executeSQLWrite(sql);
+//          } catch (SQLException e1) {
+//              e1.printStackTrace();
+//          }
+//      }
 
         ti = dbi.getTableInfo(tableName);
         cleanDirtyRow(ti, tableName, new JSONObject(vallist));
 
-//             String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";";
-//             java.sql.ResultSet rs = executeSQLRead(selectQuery);
-//             String dbWriteQuery=null;
-//             try {
-//                     if(rs.next()){//this entry is there, do an update
-//                             dbWriteQuery = "UPDATE "+tableName+" SET "+columnNameString+" = "+ valueString +"WHERE "+primaryKeyName+"="+primaryKeyValue+";";
-//                     }else
-//                             dbWriteQuery = "INSERT INTO "+tableName+" VALUES"+valueString+";";
-//                     executeSQLWrite(dbWriteQuery);
-//             } catch (SQLException e) {
-//                     // ZZTODO Auto-generated catch block
-//                     e.printStackTrace();
-//             }
+//      String selectQuery = "select "+ primaryKeyName+" FROM "+tableName+" WHERE "+primaryKeyName+"="+primaryKeyValue+";";
+//      java.sql.ResultSet rs = executeSQLRead(selectQuery);
+//      String dbWriteQuery=null;
+//      try {
+//          if(rs.next()){//this entry is there, do an update
+//              dbWriteQuery = "UPDATE "+tableName+" SET "+columnNameString+" = "+ valueString +"WHERE "+primaryKeyName+"="+primaryKeyValue+";";
+//          }else
+//              dbWriteQuery = "INSERT INTO "+tableName+" VALUES"+valueString+";";
+//          executeSQLWrite(dbWriteQuery);
+//      } catch (SQLException e) {
+//          // ZZTODO Auto-generated catch block
+//          e.printStackTrace();
+//      }
 
         //clean the music dirty bits table
-//             String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId;
-//             String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;";
-//             executeMusicWriteQuery(deleteQuery);
+//      String dirtyRowIdsTableName = music_ns+".DIRTY_"+tableName+"_"+myId;
+//      String deleteQuery = "DELETE FROM "+dirtyRowIdsTableName+" WHERE dirtyRowKeys=$$"+primaryKeyValue+"$$;";
+//      executeMusicWriteQuery(deleteQuery);
     }
     private Object getValue(Row musicRow, String colname) {
         ColumnDefinitions cdef = musicRow.getColumnDefinitions();
@@ -914,14 +914,14 @@ public class MusicMixin implements MusicInterface {
             pQueryObject.appendQueryString(cql);
             String primaryKey = getMusicKeyFromRow(ti,tableName, changedRow);
             updateMusicDB(tableName, primaryKey, pQueryObject);
-                       
-                       /*PreparedStatement ps = getPreparedStatementFromCache(cql);
-                       BoundStatement bound = ps.bind(newrow);
-                       bound.setReadTimeoutMillis(60000);
-                       Session sess = getMusicSession();
-                       synchronized (sess) {
-                               sess.execute(bound);
-                       }*/
+            
+            /*PreparedStatement ps = getPreparedStatementFromCache(cql);
+            BoundStatement bound = ps.bind(newrow);
+            bound.setReadTimeoutMillis(60000);
+            Session sess = getMusicSession();
+            synchronized (sess) {
+                sess.execute(bound);
+            }*/
             // Mark the dirty rows in music for all the replicas but us
             markDirtyRow(ti,tableName, changedRow);
         }
@@ -978,7 +978,7 @@ public class MusicMixin implements MusicInterface {
         if(rt.getResult().getResult().toLowerCase().equals("failure")) {
             logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage());
         }
-               
+        
     }
 
     /**
@@ -1287,7 +1287,10 @@ public class MusicMixin implements MusicInterface {
     @Override
     public void commitLog(DatabasePartition partition,List<Range> eventualRanges,  StagingTable transactionDigest,
                           String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException {
-            
+        
+        // first deal with commit for eventually consistent tables
+        filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
+        
         if(partition==null){
             logger.warn("Trying tcommit log with null partition");
             return;
@@ -1382,7 +1385,7 @@ public class MusicMixin implements MusicInterface {
             throw new MDBCServiceException();
         }
         
-        if(!transactionDigest.isEmpty()) {
+        if(!transactionDigest.isEventualEmpty()) {
             ByteBuffer serialized = transactionDigest.getSerializedEventuallyStagingAndClean();
 
             if (serialized!=null && useCompression) {
@@ -1513,7 +1516,7 @@ public class MusicMixin implements MusicInterface {
             throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e);
         }
 
-       return getMRIRowFromCassandraRow(newRow);
+        return getMRIRowFromCassandraRow(newRow);
     }
 
     @Override
@@ -1529,18 +1532,18 @@ public class MusicMixin implements MusicInterface {
             logger.error("Get operationt error: Failure to get row from MRI "+musicRangeInformationTableName);
             throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e);
         }
-       return getRangeDependenciesFromCassandraRow(newRow);
+        return getRangeDependenciesFromCassandraRow(newRow);
     }
 
     /**
      * This function creates the TransactionInformation table. It contain information related
      * to the transactions happening in a given partition.
-     *          * The schema of the table is
-     *                 * Id, uiid.
-     *                 * Partition, uuid id of the partition
-     *                 * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables
-     *         * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables
-     *         * Redo: list of uiids associated to the Redo Records Table
+     *   * The schema of the table is
+     *      * Id, uiid.
+     *      * Partition, uuid id of the partition
+     *      * LatestApplied, int indicates which values from the redologtable wast the last to be applied to the data tables
+     *      * Applied: boolean, indicates if all the values in this redo log table where already applied to data tables
+     *      * Redo: list of uiids associated to the Redo Records Table
      *
      */
     public static void createMusicRangeInformationTable(String namespace, String tableName) throws MDBCServiceException {
@@ -1727,8 +1730,8 @@ public class MusicMixin implements MusicInterface {
 
     /**
      * This function creates the MusicEveTxDigest table. It contain information related to each eventual transaction committed
-     *         * LeaseId: id associated with the lease, text
-     *         * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
+     *  * LeaseId: id associated with the lease, text
+     *  * LeaseCounter: transaction number under this lease, bigint \TODO this may need to be a varint later
      *  * TransactionDigest: text that contains all the changes in the transaction
      */
     public static void createMusicEventualTxDigest(String musicEventualTxDigestTableName, String musicNamespace, int musicTxDigestTableNumber) throws MDBCServiceException {
@@ -1738,11 +1741,12 @@ public class MusicMixin implements MusicInterface {
                 "-" +
                 Integer.toString(musicTxDigestTableNumber);
         }
-        String priKey = "txTimeId";
+        String priKey = "txTimeId, year";
         StringBuilder fields = new StringBuilder();
         fields.append("txid uuid, ");
         fields.append("transactiondigest blob, ");
         fields.append("compressed boolean, ");
+        fields.append("year int, ");
         fields.append("txTimeId TIMEUUID ");//notice lack of ','
         String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));", musicNamespace, tableName, fields, priKey);
         try {
@@ -1832,19 +1836,17 @@ public class MusicMixin implements MusicInterface {
     public void addEventualTxDigest(MusicTxDigestId newId, ByteBuffer transactionDigest) throws MDBCServiceException {
         //createTxDigestRow(music_ns,musicTxDigestTable,newId,transactionDigest);
         PreparedQueryObject query = new PreparedQueryObject();
-        String cqlQuery = "INSERT INTO " +
-            this.music_ns +
-            '.' +
-            this.musicEventualTxDigestTableName +
-            " (txid,transactiondigest,compressed,txTimeId) " +
-            "VALUES (" +
-            newId.transactionId+ ",'" +
-            transactionDigest + "'," +
-            useCompression + ","+
-           // "toTimestamp(now())" +
-           "now()" +
-            ");";
-        query.appendQueryString(cqlQuery);
+        int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR);
+        
+        
+        String cql = String.format("INSERT INTO %s.%s (txid,transactiondigest,compressed,year,txTimeId ) VALUES (?,?,?,?,now());",this.music_ns,
+                this.musicEventualTxDigestTableName);
+            query.appendQueryString(cql);
+            query.addValue( newId.transactionId);
+            query.addValue(transactionDigest);
+            query.addValue(useCompression);
+            query.addValue(year);
+           // query.appendQueryString(cqlQuery);
         //\TODO check if I am not shooting on my own foot
         try {
             MusicCore.nonKeyRelatedPut(query,"critical");
@@ -1885,6 +1887,15 @@ public class MusicMixin implements MusicInterface {
     
     @Override
     public LinkedHashMap<UUID, StagingTable> getEveTxDigest(String nodeName) throws MDBCServiceException {
+        int year = java.util.Calendar.getInstance().get(java.util.Calendar.YEAR);
+        StringBuffer yearSb = new StringBuffer();
+        String sep = "";
+        for (int y=2019; y<=year; y++) {
+            yearSb.append(sep);
+            yearSb.append(y);
+            sep = ",";
+        }
+
         StagingTable changes;
         String cql;
         LinkedHashMap<UUID, StagingTable> ecDigestInformation = new LinkedHashMap<>();
@@ -1893,12 +1904,12 @@ public class MusicMixin implements MusicInterface {
         
         if (musicevetxdigestNodeinfoTimeID != null) {
             // this will fetch only few records based on the time-stamp condition.
-            cql = String.format("SELECT * FROM %s.%s WHERE txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName);
+            cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) AND txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString());
             pQueryObject.appendQueryString(cql);
             pQueryObject.addValue(musicevetxdigestNodeinfoTimeID);
         } else {
             // This is going to Fetch all the Transactiondigest records from the musicevetxdigest table.
-            cql = String.format("SELECT * FROM %s.%s LIMIT 10;", music_ns, this.musicEventualTxDigestTableName);
+            cql = String.format("SELECT * FROM %s.%s WHERE year in (%s) LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName, yearSb.toString());
             pQueryObject.appendQueryString(cql);
         }
         
index dbed9e4..26ee73c 100755 (executable)
@@ -91,7 +91,7 @@ public class StagingTable {
         builderInitialized=true;
                digestBuilder = CompleteDigest.newBuilder();
                this.eventuallyConsistentRanges=eventuallyConsistentRanges;
-               eventuallyBuilder = (!this.eventuallyConsistentRanges.isEmpty())?null:CompleteDigest.newBuilder();
+               eventuallyBuilder = (this.eventuallyConsistentRanges.isEmpty())?null:CompleteDigest.newBuilder();
        }
 
        public StagingTable(ByteBuffer serialized) throws MDBCServiceException {
@@ -242,7 +242,15 @@ public class StagingTable {
     }
 
     synchronized public boolean isEmpty() {
-           return (digestBuilder.getRowsCount()==0);
+           return (digestBuilder.getRowsCount()==0 && eventuallyBuilder.getRowsCount()==0);
+    }
+    
+    synchronized public boolean isStrongEmpty() {
+        return (digestBuilder.getRowsCount()==0);
+    }
+    
+    synchronized public boolean isEventualEmpty() {
+        return (eventuallyBuilder.getRowsCount()==0);
     }
        
        synchronized public void clear() throws MDBCServiceException {
index aba8cb4..41a943e 100644 (file)
 
 package org.onap.music.mdbc.mixins;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.junit.Assert.*;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-
-import java.util.*;
-
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.function.Consumer;
 import org.cassandraunit.utils.EmbeddedCassandraServerHelper;
-
-
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
-import org.junit.rules.Timeout;
 import org.onap.music.datastore.MusicDataStore;
 import org.onap.music.datastore.MusicDataStoreHandle;
 import org.onap.music.exceptions.MDBCServiceException;
@@ -51,12 +49,13 @@ import org.onap.music.mdbc.DatabasePartition;
 import org.onap.music.mdbc.MDBCUtils;
 import org.onap.music.mdbc.Range;
 import org.onap.music.mdbc.StateManager;
-import org.onap.music.mdbc.TestUtils;
-import org.onap.music.mdbc.ownership.Dag;
-import org.onap.music.mdbc.ownership.DagNode;
+import org.onap.music.mdbc.proto.ProtoDigest.Digest.CompleteDigest;
 import org.onap.music.mdbc.tables.MusicRangeInformationRow;
 import org.onap.music.mdbc.tables.MusicTxDigestId;
-import org.onap.music.service.impl.MusicCassaCore;
+import org.onap.music.mdbc.tables.StagingTable;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.google.protobuf.InvalidProtocolBufferException;
 
 public class MusicMixinTest {
        
@@ -109,6 +108,7 @@ public class MusicMixinTest {
             Properties properties = new Properties();
             properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace);
             properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName);
+            properties.setProperty(MusicMixin.KEY_COMPRESSION, Boolean.toString(true));
             mixin=new MusicMixin(stateManager, mdbcServerName,properties);
         } catch (MDBCServiceException e) {
             fail("error creating music mixin");
@@ -246,4 +246,39 @@ public class MusicMixinTest {
     @Test
     public void relinquishIfRequired() {
     }
+    
+    @Test
+    public void getEveTxDigest() throws Exception {
+        
+        mixin.createMusicEventualTxDigest();
+        ByteBuffer compressed = mockCompressedProtoByteBuff();
+        MusicTxDigestId digestId = new MusicTxDigestId(UUID.randomUUID(), 1);
+        mixin.addEventualTxDigest(digestId, compressed);
+        
+        LinkedHashMap<UUID, StagingTable> digest =  mixin.getEveTxDigest("n1");
+        
+        Consumer<Map.Entry<UUID,StagingTable>> consumer = new Consumer<Map.Entry<UUID,StagingTable>>() {
+
+            @Override
+            public void accept(Entry<UUID, StagingTable> mapEntry) {
+                assertNotNull(mapEntry.getValue());
+            }
+            
+        };
+       
+        digest.entrySet().forEach(consumer);
+        
+        
+        
+        
+    }
+
+    protected ByteBuffer mockCompressedProtoByteBuff() throws MDBCServiceException, InvalidProtocolBufferException {
+        CompleteDigest instance = CompleteDigest.getDefaultInstance();
+        // CompleteDigest instance  = CompleteDigest.parseFrom(ByteBuffer.wrap("Test".getBytes()));
+        byte[] bytes = instance.toByteArray();
+        ByteBuffer serialized = ByteBuffer.wrap(bytes);
+        ByteBuffer compressed = StagingTable.Compress(serialized);
+        return compressed;
+    }
 }