Add optimization to Eve Logic 28/77728/1
authorst782s <statta@research.att.com>
Fri, 1 Feb 2019 15:54:59 +0000 (10:54 -0500)
committerst782s <statta@research.att.com>
Fri, 1 Feb 2019 15:54:59 +0000 (10:54 -0500)
Issue-ID: MUSIC-276
Change-Id: I5c2e7ee79301ebca71abda952153eed89eea3f8f
Signed-off-by: st782s <statta@research.att.com>
mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java [changed mode: 0755->0644]
mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcEventualTestClient.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java [changed mode: 0755->0644]
mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java

old mode 100755 (executable)
new mode 100644 (file)
index b403dd2..6cc50ec
@@ -198,6 +198,14 @@ public class StateManager {
             eventualLock.unlock();
         }
     }
+    
+    public String getMdbcServerName() {
+        return mdbcServerName;
+    }
+
+    public void setMdbcServerName(String mdbcServerName) {
+        this.mdbcServerName = mdbcServerName;
+    }
 
     public void closeConnection(String connectionId){
         //\TODO check if there is a race condition
index 338e697..c5c2b99 100644 (file)
@@ -89,7 +89,7 @@ public class MdbcEventualTestClient {
 
         final String insertSQL = "INSERT INTO audit_log VALUES (1, 123, 123456789);";
         final String insertSQL1 = "DELETE FROM audit_log WHERE PersonID=1;";
-        final String insertSQL2 = "INSERT INTO audit_log VALUES (1, 123, 123456789);";
+        final String insertSQL2 = "INSERT INTO audit_log VALUES (3, 123, 123456789);";
         final String insertSQL3 = "UPDATE audit_log SET PersonID=124 where id=1;";
         final String insertSQL4 = "INSERT INTO audit_log VALUES (2, 234, 123456789);";
 
index 49d4c71..22c532b 100755 (executable)
 package org.onap.music.mdbc.mixins;
 
 import com.datastax.driver.core.ResultSet;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 import org.json.JSONObject;
 import org.onap.music.exceptions.MDBCServiceException;
 import org.onap.music.exceptions.MusicLockingException;
@@ -259,6 +255,15 @@ public interface MusicInterface {
      * @throws MDBCServiceException
      */
        HashMap<Range,StagingTable> getTxDigest(MusicTxDigestId id) throws MDBCServiceException;
+       
+       /**
+     * Function used to retrieve a given eventual transaction digest for the current node and deserialize it
+     * @param nodeName that identifies a node
+     * @return the deserialize transaction digest that can be applied to the local SQL database
+     * @throws MDBCServiceException
+     */
+       
+       public LinkedHashMap<UUID, HashMap<Range,StagingTable>> getEveTxDigest(String nodeName) throws MDBCServiceException;
 
     /**
      * Use this functions to verify ownership, and own new ranges
@@ -320,9 +325,9 @@ public interface MusicInterface {
     List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException;
 
     OwnershipAndCheckpoint getOwnAndCheck();
-    
-    ArrayList<HashMap<Range, StagingTable>> getEveTxDigest() throws MDBCServiceException;
 
     void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException;
+    
+    public void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException;
 }
 
old mode 100755 (executable)
new mode 100644 (file)
index 999c67f..963647c
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -38,7 +39,6 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiFunction;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang3.tuple.Pair;
 import org.json.JSONObject;
 import org.onap.music.datastore.Condition;
@@ -58,7 +58,12 @@ import org.onap.music.mdbc.TableInfo;
 import org.onap.music.mdbc.ownership.Dag;
 import org.onap.music.mdbc.ownership.DagNode;
 import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
-import org.onap.music.mdbc.tables.*;
+import org.onap.music.mdbc.tables.MriReference;
+import org.onap.music.mdbc.tables.MusicRangeInformationRow;
+import org.onap.music.mdbc.tables.MusicTxDigestId;
+import org.onap.music.mdbc.tables.RangeDependency;
+import org.onap.music.mdbc.tables.StagingTable;
+import org.onap.music.mdbc.tables.TxCommitProgress;
 import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.ColumnDefinitions;
 import com.datastax.driver.core.DataType;
@@ -119,6 +124,7 @@ public class MusicMixin implements MusicInterface {
     private String musicEventualTxDigestTableName = "musicevetxdigest";
     private String musicRangeInformationTableName = "musicrangeinformation";
     private String musicRangeDependencyTableName = "musicrangedependency";
+    private String musicNodeInfoTableName = "nodeinfo";
 
     private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicMixin.class);
 
@@ -336,6 +342,7 @@ public class MusicMixin implements MusicInterface {
         try {
             createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number
             createMusicEventualTxDigest();
+            createMusicNodeInfoTable();
             createMusicRangeInformationTable(this.music_ns,this.musicRangeInformationTableName);
             createMusicRangeDependencyTable();
         }
@@ -982,12 +989,7 @@ public class MusicMixin implements MusicInterface {
         if(rt.getResult().getResult().toLowerCase().equals("failure")) {
             logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage());
         }
-               /*Session sess = getMusicSession();
-               SimpleStatement s = new SimpleStatement(cql);
-               s.setReadTimeoutMillis(60000);
-               synchronized (sess) {
-                       sess.execute(s);
-               }*/
+               
     }
 
     /**
@@ -1319,6 +1321,9 @@ public class MusicMixin implements MusicInterface {
     @Override
     public void commitLog(DatabasePartition partition,List<Range> eventualRanges,  HashMap<Range,StagingTable> transactionDigest,
                           String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
+        // first deal with commit for eventually consistent tables
+        filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
+        
         if(partition==null){
             logger.warn("Trying tcommit log with null partition");
             return;
@@ -1328,8 +1333,7 @@ public class MusicMixin implements MusicInterface {
             logger.warn("Trying to commit log with empty ranges");
             return;
         }
-        // first deal with commit for eventually consistent tables
-        filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
+        
  
         UUID mriIndex = partition.getMRIIndex();
         String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex;
@@ -1916,28 +1920,36 @@ public class MusicMixin implements MusicInterface {
     
     
     @Override
-    public ArrayList<HashMap<Range,StagingTable>> getEveTxDigest() throws MDBCServiceException {
+    public LinkedHashMap<UUID, HashMap<Range,StagingTable>> getEveTxDigest(String nodeName) throws MDBCServiceException {
         HashMap<Range,StagingTable> changes;
-        ArrayList<HashMap<Range,StagingTable>> ecDigestList = new ArrayList<HashMap<Range,StagingTable>>();
+        String cql;
+        LinkedHashMap<UUID, HashMap<Range,StagingTable>> ecDigestInformation = new LinkedHashMap<UUID, HashMap<Range,StagingTable>>();
+        String musicevetxdigestNodeinfoTimeID = getTxTimeIdFromNodeInfo(nodeName);
+        PreparedQueryObject pQueryObject = new PreparedQueryObject();
+        
+        if (musicevetxdigestNodeinfoTimeID != null && !musicevetxdigestNodeinfoTimeID.isEmpty() ) {
+            // this will fetch only few records based on the time-stamp condition.
+            cql = String.format("SELECT * FROM %s.%s WHERE txtimeid > ?;", music_ns, this.musicEventualTxDigestTableName);
+            pQueryObject.appendQueryString(cql);
+            pQueryObject.addValue(musicevetxdigestNodeinfoTimeID);
             
-            //but was this timestamp is getting added as per post: https://dev.mysql.com/doc/refman/8.0/en/time-zone-leap-seconds.html
-            //Ex1: SELECT uuid, txDigest, UNIX_TIMESTAMP(ts) FROM ectxdigest ORDER by ts;
-            //Ex2: SELECT * FROM ectxdigest ORDER by ts; or SELECT * FROM ectxdigest
-          //####### this will pull all records.. but REPLAY will be against specific records once the NODE it back ON-Line.
-            // I should get the last record timestamp so that I can put a where condition.
-            //EX3: SELECT uuid, txDigest, UNIX_TIMESTAMP(ts) FROM ectxdigest where UNIX_TIMESTAMP(ts)>UNIX_TIMESTAMP(<<Date/Time value from others>>) ORDER by ts;
-        String cql = String.format("SELECT * FROM %s.%s ;", music_ns, this.musicEventualTxDigestTableName);  
-            // Ex 1 & 2 might return millions of records!! things to consider outOfMemory issue, performance issue etc.. How to overcome??
-            // Ex 3: will return less records compare to Ex:1 and Ex:2.
-
-            // I need to get a ResultSet of all the records and give each row to the below HashMap.
-        ResultSet rs = executeMusicRead(cql);
+        } else {
+            // This is going to Fetch all the Transactiondigest records from the musicevetxdigest table.
+            cql = String.format("SELECT * FROM %s.%s ;", music_ns, this.musicEventualTxDigestTableName);
+            pQueryObject.appendQueryString(cql);
+        }
+        
+        // I need to get a ResultSet of all the records and give each row to the below HashMap.
+        ResultSet rs = executeMusicRead(pQueryObject.getQuery());
         while (!rs.isExhausted()) {
             Row row = rs.one();
-            String digest = row.getString("transactiondigest");
+            String digest = row.getString("transactiondigest");        
+            //String txTimeId = row.getString("txtimeid"); //???
+            UUID txTimeId = row.getUUID("txtimeid");    
             
             try {
                 changes = (HashMap<Range, StagingTable>) MDBCUtils.fromString(digest);
+                
             } catch (IOException e) {
                 logger.error("IOException when deserializing digest");
                 throw new MDBCServiceException("Deserializng digest failed with ioexception", e);
@@ -1946,10 +1958,10 @@ public class MusicMixin implements MusicInterface {
                 throw new MDBCServiceException("Deserializng digest failed with an invalid class", e);
             }
 
-            ecDigestList.add(changes);
+            ecDigestInformation.put(txTimeId, changes);
+
         }      
-            
-        return ecDigestList;
+        return ecDigestInformation;
     }
 
     @Override
@@ -2477,8 +2489,7 @@ public class MusicMixin implements MusicInterface {
         return result.one();
     }
 
-    private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject)
-        throws MDBCServiceException{
+    private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject) throws MDBCServiceException{
         ResultSet result = MusicCore.quorumGet(cqlObject);
         if(result == null || result.isExhausted()){
             throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]");
@@ -2546,5 +2557,89 @@ public class MusicMixin implements MusicInterface {
     public OwnershipAndCheckpoint getOwnAndCheck(){
         return ownAndCheck;
     }
+    
+    @Override
+    public void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException{
+        
+           String cql = String.format("UPDATE %s.%s SET txtimeid = (%s), txupdatedatetime = now() WHERE nodename = ?;", music_ns, this.musicEventualTxDigestTableName, txTimeID);
+            PreparedQueryObject pQueryObject = new PreparedQueryObject();
+            pQueryObject.appendQueryString(cql);
+            pQueryObject.addValue(nodeName);
+        
+            executeMusicWriteQuery(pQueryObject.getQuery());
+            logger.info("Successfully updated nodeinfo table with txtimeid value: " + txTimeID + " against the node:" + nodeName);
+            
+        
+    }
+    
+    public void createMusicNodeInfoTable() throws MDBCServiceException {
+        createMusicNodeInfoTable(-1);
+    }
+    
+    /**
+     * This function creates the NodeInfo table. It contain information related
+     * to the nodes along with the updated transactionDigest details.
+     *   * The schema of the table is
+     *      * nodeId, uuid. 
+     *      * nodeName, text or varchar?? for now I am going ahead with "text".
+     *      * createDateTime, TIMEUUID.
+     *      * TxUpdateDateTime, TIMEUUID.
+     *      * TxTimeID, TIMEUUID.
+     *      * LastTxDigestID, uuid. (not needed as of now!!)
+     */
+    private void createMusicNodeInfoTable(int nodeInfoTableNumber) throws MDBCServiceException {
+        String tableName = this.musicNodeInfoTableName;
+        if(nodeInfoTableNumber >= 0) {
+            tableName = tableName +
+                "-" +
+                Integer.toString(nodeInfoTableNumber);
+        }
+
+        String priKey = "nodename";
+        StringBuilder fields = new StringBuilder();
+        fields.append("nodename text, ");
+        fields.append("createdatetime TIMEUUID, ");
+        fields.append("txupdatedatetime TIMEUUID, ");
+        fields.append("txtimeid TIMEUUID ");
+        //fields.append("LastTxDigestID uuid ");// Not needed as of now!     
+        
+        String cql = String.format(
+                "CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
+                this.music_ns,
+                tableName,
+                fields,
+                priKey);
+        
+        try {
+            executeMusicWriteQuery(this.music_ns,tableName,cql);
+        } catch (MDBCServiceException e) {
+            logger.error("Initialization error: Failure to create node information table");
+            throw(e);
+        }
+    }
+    
+    public String getTxTimeIdFromNodeInfo(String nodeName) throws MDBCServiceException {
+            // expecting NodeName from base-0.json file: which is : NJNode
+            //String nodeName = MdbcServer.stateManager.getMdbcServerName(); 
+            // this retrieves the NJNode row from Cassandra's NodeInfo table so that I can retrieve TimeStamp for further processing.
+        String cql = String.format("SELECT txtimeid FROM %s.%s WHERE nodeName = ?;", music_ns, musicNodeInfoTableName);  
+        PreparedQueryObject pQueryObject = new PreparedQueryObject();
+        pQueryObject.appendQueryString(cql);
+        pQueryObject.addValue(nodeName);
+        Row newRow;
+        try {
+            newRow = executeMusicUnlockedQuorumGet(pQueryObject);
+        } catch (MDBCServiceException e) {
+            logger.error("Get operation error: Failure to get row from nodeinfo with nodename:"+nodeName);
+            // TODO check underlying exception if no data and return empty string
+            return "";
+            //throw new MDBCServiceException("error:Failure to retrive nodeinfo details information", e);
+        }
+        
+        String txtimeid = newRow.getString("txtimeid");
+
+        return txtimeid;
+    }
+
 
 }
index 3b6953c..204292c 100644 (file)
@@ -104,26 +104,31 @@ public class MusicTxDigest {
         * @param dbi interface to the database that will replay the operations
         * @throws MDBCServiceException
         */
-       public static void replayDigest(MusicInterface mi, DBInterface dbi) throws MDBCServiceException {
-                                       //List<MusicTxDigestId> partitionsRedoLogTxIds = mi.getMusicRangeInformation(partitionId).getRedoLog();
-                                       //From where should I fetch TransactionsIDs ??? from NEW TABLE ?? or EXISING TABLE ?? << what the new SITE_TABLE details??
-                                       // --> It is a new table called ECTxDigest
-                                       //I should sort/ call a method which gives all the entires of  a table based on the time-stamp from Low to High
-
-               ArrayList<HashMap<Range, StagingTable>> ecTxDigest = mi.getEveTxDigest();
-               
-                                       //for (MusicTxDigestId txId: partitionsRedoLogTxIds) { // partitionsRedoLogTxIds --> this comes from new table where timeStamp > currentTimeStamp  ( THIS SHOULD BE lessthan.. which is ASC order)
-                                       //HashMap<Range, StagingTable> transaction = mi2.getEcTxDigest();  // Getting records from musictxdigest TABLE.
-               for (HashMap<Range, StagingTable> transaction: ecTxDigest) {
-                       try {
-                               dbi.replayTransaction(transaction); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??)
-                       } catch (SQLException e) {
-                               logger.error("EC:Rolling back the entire digest replay.");
-                               return;
-                       }
-                       logger.info("EC: Successfully replayed transaction ");
-               }
-       }
+   public void replayDigest(MusicInterface mi, DBInterface dbi) throws MDBCServiceException {
+             HashMap<Range, StagingTable> transaction;
+             String nodeName = stateManager.getMdbcServerName();
+             logger.info("Node Name: "+nodeName);
+             LinkedHashMap<UUID, HashMap<Range,StagingTable>> ecDigestInformation = mi.getEveTxDigest(nodeName);
+             Set<UUID> keys = ecDigestInformation.keySet();
+             
+             for(UUID txTimeID:keys){
+                 transaction = (HashMap<Range,StagingTable>) ecDigestInformation.get(txTimeID);
+                 
+                 try {
+                     dbi.replayTransaction(transaction); // I think this Might change if the data is coming from a new table.. ( what is the new table structure??)
+                 } catch (SQLException e) {
+                     logger.error("EC:Rolling back the entire digest replay.");
+                     return;
+                 }
+                 logger.info("EC: Successfully replayed transaction for txTimeID key: "+txTimeID);
+                
+                 try {
+                     mi.updateNodeInfoTableWithTxTimeIDKey(txTimeID, nodeName);
+                 } catch (MDBCServiceException e) {
+                     logger.error("EC:Rolling back the entire digest replay.");
+                 }
+             }
+         }
 
        
        /**