Warm up ranges across sites 24/82824/2
authorTschaen, Brendan <ctschaen@att.com>
Wed, 20 Mar 2019 16:12:06 +0000 (12:12 -0400)
committerTschaen, Brendan <ctschaen@att.com>
Wed, 20 Mar 2019 20:26:05 +0000 (16:26 -0400)
Add configuration to select which ranges we warm up. Default (null) is all ranges

Change-Id: I0f241563959cc9fb673ab6d8a87c45e8399086c2
Issue-ID: MUSIC-287
Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
mdbc-server/pom.xml
mdbc-server/src/main/java/org/onap/music/mdbc/Configuration.java
mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
mdbc-server/src/main/resources/music.properties
mdbc-server/src/test/java/org/onap/music/mdbc/StateManagerTest.java [new file with mode: 0644]
mdbc-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker [new file with mode: 0644]

index 874fce7..f69cf2f 100755 (executable)
             <version>1.0.0</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>2.23.4</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>com.github.jsqlparser</groupId>
             <artifactId>jsqlparser</artifactId>
index 7e39772..ced5745 100755 (executable)
@@ -38,4 +38,10 @@ public class Configuration {
        public static final String TX_DAEMON_SLEEPTIME_S = "txdaemonsleeps";
        /** Default txDigest Daemon sleep time */
        public static final String TX_DAEMON_SLEEPTIME_S_DEFAULT = "10";
+    /**  The property name to use to provide a timeout to mdbc (ownership) */
+    public static final String KEY_OWNERSHIP_TIMEOUT = "mdbc_timeout";
+    /** The default property value to use for the MDBC timeout */
+    public static final long DEFAULT_OWNERSHIP_TIMEOUT = 5*60*60*1000;//default of 5 hours
+    /** The property name to provide comma separated list of ranges to warmup */
+    public static final String KEY_WARMUPRANGES = "warmupranges";
 }
index 430e783..f0d9832 100644 (file)
@@ -45,6 +45,7 @@ import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
@@ -82,21 +83,26 @@ public class StateManager {
     String cassandraUrl;
     private Properties info;
     
-    /**  The property name to use to provide a timeout to mdbc (ownership) */
-    public static final String KEY_TIMEOUT = "mdbc_timeout";
-    /** The default property value to use for the MDBC timeout */
-    public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours
-    
     /** Identifier for this server instance */
     private String mdbcServerName;
     private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition
     private final Lock eventualLock  = new ReentrantLock();
     private List<Range> eventualRanges;
+    /** lock for warmupRanges */
     private final Lock warmupLock = new ReentrantLock();
-    private List<Range> warmupRanges;
+    /** a set of ranges that should be periodically updated with latest information, if null all tables should be warmed up */
+    private Set<Range> rangesToWarmup;
+    /** map of transactions that have already been applied/updated in this sites SQL db */
     private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
     private OwnershipAndCheckpoint ownAndCheck;
 
+    /**
+     * For testing purposes only
+     */
+    @Deprecated
+    public StateManager() {
+    }
+    
        public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
         this.sqlDBName = sqlDBName;
         this.sqlDBUrl = sqlDBUrl;
@@ -118,11 +124,14 @@ public class StateManager {
         initMusic();
         initSqlDatabase(); 
 
-        String t = info.getProperty(KEY_TIMEOUT);
-        long timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t);
+        String t = info.getProperty(Configuration.KEY_OWNERSHIP_TIMEOUT);
+        long timeoutMs = (t == null) ? Configuration.DEFAULT_OWNERSHIP_TIMEOUT : Integer.parseInt(t);
         alreadyApplied = new ConcurrentHashMap<>();
-        ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout);
+        ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeoutMs);
         
+        rangesToWarmup = initWarmupRanges();
+        logger.info("Warmup ranges for this site is " + rangesToWarmup);
+
         MusicTxDigest txDaemon = new MusicTxDigest(this);
         txDaemon.startBackgroundDaemon(Integer.parseInt(
                        info.getProperty(Configuration.TX_DAEMON_SLEEPTIME_S, Configuration.TX_DAEMON_SLEEPTIME_S_DEFAULT))); 
@@ -160,6 +169,24 @@ public class StateManager {
             throw new MDBCServiceException(e.getMessage(), e);
         }
     }
+    
+    /**
+     * Get list of ranges to warmup from configuration file
+     * if no configuration is provided, will return null
+     * @return
+     */
+    private Set<Range> initWarmupRanges() {
+        String warmupString = info.getProperty(Configuration.KEY_WARMUPRANGES);
+        if (warmupString==null) {
+            return null;
+        }
+        Set<Range> warmupRanges = new HashSet<>();
+        String[] ranges = warmupString.split(",");
+        for (String range: ranges) {
+            warmupRanges.add(new Range(range.trim()));
+        }
+        return warmupRanges;
+    }
 
     public MusicInterface getMusicInterface() {
        return this.musicInterface;
@@ -169,23 +196,45 @@ public class StateManager {
         return new ArrayList<>(connectionRanges.values());
        }
 
-       public List<Range> getWarmupRanges(){
+    /**
+     * Get a list of ranges that are to be periodically warmed up
+     * 
+     * If no list is specified, all ranges except eventual consistency ranges are returned
+     * @return
+     */
+       public Set<Range> getRangesToWarmup() {
         warmupLock.lock();
-        List<Range> returnArray;
+        Set<Range> returnSet;
         try {
-            if(warmupRanges!=null) {
-                returnArray = new ArrayList<>(warmupRanges);
+            if(rangesToWarmup!=null) {
+                returnSet = rangesToWarmup;
             }
-            else{
-                returnArray = null;
+            else {
+                returnSet = getAllRanges();
+                for (Range eventualRange: eventualRanges) {
+                    returnSet.remove(eventualRange);
+                }
             }
         }
         finally{
            warmupLock.unlock();
         }
-        return returnArray;
+        return returnSet;
     }
 
+       /**
+        * Get a set of all ranges seen in the sql db
+        * @return
+        */
+       private Set<Range> getAllRanges() {
+           DBInterface dbi = ((MdbcConnection) getConnection("daemon")).getDBInterface();
+           return dbi.getSQLRangeSet();
+    }
+
+    /**
+        * Get a list of ranges that are eventually consistent
+        * @return
+        */
     public List<Range> getEventualRanges() {
         eventualLock.lock();
         List<Range> returnArray;
@@ -221,6 +270,10 @@ public class StateManager {
         this.mdbcServerName = mdbcServerName;
     }
 
+    /**
+     * Close connection and relinquish any locks held for that connection
+     * @param connectionId
+     */
     public void closeConnection(String connectionId){
         //\TODO check if there is a race condition
         if(mdbcConnections.containsKey(connectionId)) {
@@ -331,10 +384,10 @@ public class StateManager {
 
     }
 
-    public void setWarmupRanges(List<Range> warmupRanges) {
+    public void setWarmupRanges(Set<Range> warmupRanges) {
         warmupLock.lock();
         try {
-            this.warmupRanges = warmupRanges;
+            this.rangesToWarmup = warmupRanges;
         }
         finally{
             warmupLock.unlock();
index 6a17d4c..fd8651a 100644 (file)
@@ -50,11 +50,15 @@ public class MdbcTestClient {
             System.exit(1);\r
         }\r
         Connection connection;\r
-        try {\r
-            connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf");\r
-        } catch (SQLException e) {\r
-            e.printStackTrace();\r
-            return;\r
+        try { \r
+            String metricURL = "http://localhost:300000/test"; \r
+            if(args[0] != null) { \r
+                metricURL = args[0]; \r
+            } \r
+            connection = DriverManager.getConnection("jdbc:avatica:remote:url=" + metricURL+ ";serialization=protobuf"); \r
+        } catch (SQLException e) { \r
+            e.printStackTrace(); \r
+            return; \r
         }\r
 \r
         try {\r
@@ -104,7 +108,7 @@ public class MdbcTestClient {
         }\r
 \r
         final String insertSQL = "INSERT INTO Persons VALUES (1, 'Martinez', 'Juan', 'KACB', 'ATLANTA');";\r
-        final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=1;";\r
+        final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=2;";\r
         final String insertSQL2 = "INSERT INTO Persons VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');";\r
         final String insertSQL3 = "UPDATE Persons SET FirstName='JOSH' WHERE LastName='Smith';";\r
         final String insertSQL4 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';";\r
@@ -119,11 +123,11 @@ public class MdbcTestClient {
         }\r
 \r
         try {\r
-            execute = insertStmt.execute(insertSQL);\r
+            //execute = insertStmt.execute(insertSQL);\r
             execute = insertStmt.execute(insertSQL1);\r
-            execute = insertStmt.execute(insertSQL2);\r
-            execute = insertStmt.execute(insertSQL3);\r
-            execute = insertStmt.execute(insertSQL4);\r
+            //execute = insertStmt.execute(insertSQL2);\r
+            //execute = insertStmt.execute(insertSQL3);\r
+            //execute = insertStmt.execute(insertSQL4);\r
 \r
         } catch (SQLException e) {\r
             e.printStackTrace();\r
index 85645f3..5b72482 100755 (executable)
@@ -49,8 +49,14 @@ public interface DBInterface {
         * Get a set of the table names in the database. The table names should be returned in UPPER CASE.
         * @return the set
         */
+       @Deprecated
        Set<String> getSQLTableSet();
        /**
+     * Get a set of the ranges in the database
+     * @return the set
+     */
+    Set<Range> getSQLRangeSet();
+       /**
         * Return the name of the database that the driver is connected to
         * @return
         */
index 420f9d4..7154b34 100755 (executable)
@@ -27,6 +27,7 @@ import java.sql.Statement;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -150,10 +151,7 @@ public class MySQLMixin implements DBInterface {
        public String getDatabaseName() {
                return this.dbName;
        }
-       /**
-        * Get a set of the table names in the database.
-        * @return the set
-        */
+
        @Override
        public Set<String> getSQLTableSet() {
                Set<String> set = new TreeSet<String>();
@@ -172,6 +170,30 @@ public class MySQLMixin implements DBInterface {
                logger.debug(EELFLoggerDelegate.applicationLogger,"getSQLTableSet returning: "+ set);
                return set;
        }
+       
+       @Override
+    public Set<Range> getSQLRangeSet() {
+        Set<String> set = new TreeSet<String>();
+        String sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
+        try {
+            Statement stmt = jdbcConn.createStatement();
+            ResultSet rs = stmt.executeQuery(sql);
+            while (rs.next()) {
+                String s = rs.getString("TABLE_NAME");
+                set.add(s);
+            }
+            stmt.close();
+        } catch (SQLException e) {
+            logger.error(EELFLoggerDelegate.errorLogger,"getSQLTableSet: "+e);
+        }
+        logger.debug(EELFLoggerDelegate.applicationLogger,"getSQLTableSet returning: "+ set);
+        Set<Range> rangeSet = new HashSet<>();
+        for (String table: set) {
+            rangeSet.add(new Range(table));
+        }
+        return rangeSet;
+    }
+       
 /*
 mysql> describe tables;
 +-----------------+---------------------+------+-----+---------+-------+
index ddf26ce..057b550 100644 (file)
@@ -180,23 +180,30 @@ public class OwnershipAndCheckpoint{
             throw new MDBCServiceException("Error applying tx digest in local SQL",e);
         }
     }
-
-    public void warmup(MusicInterface mi, DBInterface di, List<Range> ranges) throws MDBCServiceException {
-        if(ranges.isEmpty()){
+    
+    /**
+     * Replay the updates for the partitions containing ranges to the local database
+     * @param mi
+     * @param di
+     * @param rangesToWarmup
+     * @throws MDBCServiceException
+     */
+    public void warmup(MusicInterface mi, DBInterface di, List<Range> rangesToWarmup) throws MDBCServiceException {
+        if(rangesToWarmup.isEmpty()){
             return;
         }
         boolean ready = false;
         change.set(true);
-        Set<Range> rangeSet = new HashSet<Range>(ranges);
+        Set<Range> rangeSet = new HashSet<Range>(rangesToWarmup);
         Dag dag = new Dag(false);
         while(!ready){
             if(change.get()){
                 change.set(false);
-                final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, ranges,false);
-                dag = Dag.getDag(rows,ranges);
+                final List<MusicRangeInformationRow> rows = extractRowsForRange(mi, rangesToWarmup,false);
+                dag = Dag.getDag(rows,rangesToWarmup);
             }
             else if(!dag.applied()){
-                DagNode node = dag.nextToApply(ranges);
+                DagNode node = dag.nextToApply(rangesToWarmup);
                 if(node!=null) {
                     Pair<MusicTxDigestId, List<Range>> pair = node.nextNotAppliedTransaction(rangeSet);
                     while (pair != null) {
@@ -208,7 +215,7 @@ public class OwnershipAndCheckpoint{
                             break;
                         } else {
                             final StagingTable txDigest = mi.getTxDigest(pair.getKey());
-                            applyTxDigest(ranges,di, txDigest);
+                            applyTxDigest(rangesToWarmup,di, txDigest);
                             for (Range r : pair.getValue()) {
                                 MusicRangeInformationRow row = node.getRow();
                                 alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
index 4db3315..5b3872a 100644 (file)
@@ -67,22 +67,18 @@ public class MusicTxDigest {
                            continue;
                        }
                        //2) for each partition I don't own
-            final List<Range> warmuplist = stateManager.getWarmupRanges();
-                       if(warmuplist!=null) {
-                final Set<Range> warmupRanges = new HashSet(warmuplist);
-                final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
-                List<Range> missingRanges = new ArrayList<>();
-                if (currentPartitions.size() != 0) {
-                    for (DatabasePartition part : currentPartitions) {
-                        List<Range> partitionRanges = part.getSnapshot();
-                        warmupRanges.removeAll(partitionRanges);
-                    }
-                    try {
-                        stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
-                    } catch (MDBCServiceException e) {
-                        logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
-                        continue;
-                    }
+            final Set<Range> warmupRanges = stateManager.getRangesToWarmup();
+            final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
+            if (currentPartitions.size() != 0) {
+                for (DatabasePartition part : currentPartitions) {
+                    List<Range> partitionRanges = part.getSnapshot();
+                    warmupRanges.removeAll(partitionRanges);
+                }
+                try {
+                    stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
+                } catch (MDBCServiceException e) {
+                    logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
+                    continue;
                 }
             }
 
index 83dcb7c..0b34ff9 100755 (executable)
@@ -4,5 +4,5 @@ cassandra.user =\
   cassandra
 cassandra.password =\
   cassandra
-zookeeper.host =\
-  localhost
+music_namespace =\
+  mdbc_namespace
\ No newline at end of file
diff --git a/mdbc-server/src/test/java/org/onap/music/mdbc/StateManagerTest.java b/mdbc-server/src/test/java/org/onap/music/mdbc/StateManagerTest.java
new file mode 100644 (file)
index 0000000..899fff2
--- /dev/null
@@ -0,0 +1,137 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+package org.onap.music.mdbc;
+
+import static org.junit.Assert.*;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.FieldSetter;
+import org.onap.music.exceptions.MDBCServiceException;
+import org.onap.music.mdbc.mixins.DBInterface;
+import org.onap.music.mdbc.tables.TxCommitProgress;
+
+public class StateManagerTest {
+
+    StateManager stateManager;
+    
+    @BeforeClass
+    public static void beforeClass() {
+        System.out.println("StateManagerTest");
+    }
+    
+    @Before
+    public void before() throws MDBCServiceException {
+        //shouldn't use separate constructor, but this will do for now
+        stateManager = new StateManager();
+    }
+
+    @Test
+    public void testGetEventualRanges() throws NoSuchFieldException, SecurityException {
+        List<Range> evList = new ArrayList<>();
+        evList.add(new Range("eventualRange"));
+        FieldSetter.setField(stateManager, stateManager.getClass().getDeclaredField("eventualRanges"), evList);
+        assertEquals(evList, stateManager.getEventualRanges());
+    }
+    
+    @Test
+    public void testSetEventualRanges() {
+        List<Range> evList = new ArrayList<>();
+        evList.add(new Range("eventualRange"));
+        stateManager.setEventualRanges(evList);
+        assertEquals(evList, stateManager.getEventualRanges());
+    }
+
+    @Test
+    public void testSetMdbcServerName() {
+        String serverName = "serverName";
+        stateManager.setMdbcServerName(serverName);
+        assertEquals(serverName, stateManager.getMdbcServerName());
+    }
+
+    @Test
+    public void testGetConnection() throws Exception {
+        System.out.println("Testing getting a connection");
+
+        Connection connMock = Mockito.mock(Connection.class);
+        String connName = "connectionName";
+        Map<String, Connection> connMap = new HashMap<>();
+        connMap.put(connName, connMock);
+        FieldSetter.setField(stateManager, stateManager.getClass().getDeclaredField("mdbcConnections"),
+                connMap);
+        
+        TxCommitProgress txInfoMock = Mockito.mock(TxCommitProgress.class);
+        FieldSetter.setField(stateManager, stateManager.getClass().getDeclaredField("transactionInfo"), 
+                txInfoMock);
+        
+        
+        assertEquals(connMock, stateManager.getConnection(connName));
+    }
+
+    @Test
+    public void testGetRangesToWarmup() throws Exception {
+        System.out.println("Testing warmup ranges where no ranges are defined");
+        
+        //getConnection
+        MdbcConnection connMock = Mockito.mock(MdbcConnection.class);
+        String connName = "daemon";
+        Map<String, Connection> connMap = new HashMap<>();
+        connMap.put(connName, connMock);
+        FieldSetter.setField(stateManager, stateManager.getClass().getDeclaredField("mdbcConnections"),
+                connMap);
+        TxCommitProgress txInfoMock = Mockito.mock(TxCommitProgress.class);
+        FieldSetter.setField(stateManager, stateManager.getClass().getDeclaredField("transactionInfo"), 
+                txInfoMock);
+        
+        DBInterface dbiMock = Mockito.mock(DBInterface.class);
+        Mockito.when(connMock.getDBInterface()).thenReturn(dbiMock);
+        Set<Range> allRanges = new HashSet<>();
+        allRanges.add(new Range("rangeToWarmup"));
+        allRanges.add(new Range("rangeToWarmup2"));
+        allRanges.add(new Range("eventualRange"));
+        Mockito.when(dbiMock.getSQLRangeSet()).thenReturn(allRanges);
+        
+        List<Range> eventualRanges = new ArrayList<Range>();
+        eventualRanges.add(new Range("eventualRange"));
+        stateManager.setEventualRanges(eventualRanges);
+        
+        assertEquals(2, stateManager.getRangesToWarmup().size());
+        assertTrue(stateManager.getRangesToWarmup().contains(new Range("rangeToWarmup")));
+        assertTrue(stateManager.getRangesToWarmup().contains(new Range("rangeToWarmup2")));
+    }
+    
+    @Test
+    public void testSetWarmupRanges() {
+        Set<Range> warmupRanges = new HashSet<>();
+        warmupRanges.add(new Range("rangeToWarmup"));
+        warmupRanges.add(new Range("rangeToWarmup2"));
+        stateManager.setWarmupRanges(warmupRanges);
+        assertEquals(warmupRanges, stateManager.getRangesToWarmup());
+    }
+
+}
diff --git a/mdbc-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/mdbc-server/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644 (file)
index 0000000..1f0955d
--- /dev/null
@@ -0,0 +1 @@
+mock-maker-inline