Split partition using MRU 09/91109/5
authorTschaen, Brendan <ctschaen@att.com>
Tue, 9 Jul 2019 19:02:22 +0000 (15:02 -0400)
committerTschaen, Brendan <ctschaen@att.com>
Thu, 25 Jul 2019 17:45:46 +0000 (13:45 -0400)
Most recently used approach to splitting a partition.
Split happens when committing, we see what we've used
from existing partition and splits accordingly. This
feature is configurable via the mdbc.properties file.

Issue-ID: MUSIC-404
Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
Change-Id: Ie33f4d1ae0e6253678062558015c2a10c4fae614

mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
mdbc-server/src/main/java/org/onap/music/mdbc/MdbcServer.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
mdbc-server/src/main/resources/mdbc.properties
mdbc-server/src/test/java/org/onap/music/mdbc/MdbcTestUtils.java
mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MySQLMixinTest.java

index 2294673..6f097dd 100755 (executable)
@@ -82,9 +82,13 @@ public class MdbcConnection implements Connection {
     private final TxCommitProgress progressKeeper;
     private final DBInterface dbi;
     private final StagingTable transactionDigest;
+    /** Set of tables in db */
     private final Set<String> table_set;
     private final StateManager statemanager;
+    /** partition owned for this transaction */
     private DatabasePartition partition;
+    /** ranges needed for this transaction */
+    private Set<Range> rangesUsed;
 
     public MdbcConnection(String id, String url, Connection c, Properties info, MusicInterface mi,
             TxCommitProgress progressKeeper, DatabasePartition partition, StateManager statemanager) throws MDBCServiceException {
@@ -186,6 +190,13 @@ public class MdbcConnection implements Connection {
         }
 
         dbi.preCommitHook();
+        try {
+            partition = mi.splitPartitionIfNecessary(partition, rangesUsed);
+        } catch (MDBCServiceException e) {
+            logger.warn(EELFLoggerDelegate.errorLogger, "Failure to split partition, trying to continue",
+                    AppMessages.UNKNOWNERROR, ErrorTypes.UNKNOWN, ErrorSeverity.FATAL);
+        }
+        
         try {
             logger.debug(EELFLoggerDelegate.applicationLogger, " commit ");
             // transaction was committed -- add all the updates into the REDO-Log in MUSIC
@@ -519,16 +530,15 @@ public class MdbcConnection implements Connection {
         //Check ownership of keys
         String defaultSchema = dbi.getSchema();
         Set<Range> queryTables = MDBCUtils.getTables(defaultSchema, tableToQueryType);
-        if (this.partition!=null) {
-            Set<Range> snapshot = this.partition.getSnapshot();
-            if(snapshot!=null){
-                queryTables.addAll(snapshot);
-            }
+        if (this.rangesUsed==null) {
+            rangesUsed = queryTables;
+        } else {
+            rangesUsed.addAll(queryTables);
         }
         // filter out ranges that fall under Eventually consistent
         // category as these tables do not need ownership
-        Set<Range> scQueryTables = filterEveTables(queryTables);
-        DatabasePartition tempPartition = own(scQueryTables, MDBCUtils.getOperationType(tableToQueryType));
+        Set<Range> scRanges = filterEveTables(rangesUsed);
+        DatabasePartition tempPartition = own(scRanges, MDBCUtils.getOperationType(tableToQueryType));
         if(tempPartition!=null && tempPartition != partition) {
             this.partition.updateDatabasePartition(tempPartition);
             statemanager.getOwnAndCheck().reloadAlreadyApplied(this.partition);
index 500ed81..246044b 100755 (executable)
@@ -101,6 +101,7 @@ public class MdbcServer {
        // Then start it
        server.start();
          
+       System.out.println("Started Avatica server on port " + server.getPort());
        logger.info("Started Avatica server on port {} with serialization {}", server.getPort(),
                        serialization);
     } catch (Exception e) {
index 637cb15..3afc726 100755 (executable)
@@ -181,6 +181,7 @@ public interface MusicInterface {
 
        /**
         * Commits the corresponding REDO-log into MUSIC
+        * Transaction is committed -- add all the updates into the REDO-Log in MUSIC
         *
         * @param partition information related to ownership of partitions, used to verify ownership when commiting the Tx
         * @param eventualRanges 
@@ -337,7 +338,19 @@ public interface MusicInterface {
      */
     OwnershipReturn mergeLatestRowsIfNecessary(Dag currentlyOwned, Map<UUID, LockResult> locksForOwnership, UUID ownershipId)
             throws MDBCServiceException;
-    
+
+    /**
+     * If this connection is using fewer ranges than what is owned in the current partition, split
+     * the partition to avoid a universal partition being passed around.
+     * 
+     * This will follow "most recently used" policy
+     * @param partition2 partition that this transaction currently owns
+     * @param rangesUsed set of ranges that is the minimal required for this transaction
+     * @throws MDBCServiceException 
+     */
+    public DatabasePartition splitPartitionIfNecessary(DatabasePartition partition, Set<Range> rangesUsed)
+            throws MDBCServiceException;
+
     /**
      * Create ranges in MRI table, if not already present
      * @param range to add into mri table
index 5581573..e87f7e4 100644 (file)
@@ -104,6 +104,8 @@ public class MusicMixin implements MusicInterface {
     public static final String KEY_TIMEOUT = "mdbc_timeout";
     /**  The property name to use to provide a flag indicating if compression is required */
     public static final String KEY_COMPRESSION = "mdbc_compression";
+    /**  The property name to use to provide a flag indicating if mri row splits is allowable */
+    public static final String KEY_SPLIT = "partition_splitting";
     /** Namespace for the tables in MUSIC (Cassandra) */
     public static final String DEFAULT_MUSIC_NAMESPACE = "namespace";
     /** The default property value to use for the Cassandra IP address. */
@@ -197,9 +199,9 @@ public class MusicMixin implements MusicInterface {
     private Set<String> in_progress    = Collections.synchronizedSet(new HashSet<String>());
     private StateManager stateManager;
     private boolean useCompression;
+    private boolean splitAllowed;
 
     public MusicMixin() {
-
         //this.logger         = null;
         this.musicAddress   = null;
         this.music_ns       = null;
@@ -237,6 +239,9 @@ public class MusicMixin implements MusicInterface {
         String s            = info.getProperty(KEY_MUSIC_RFACTOR);
         this.music_rfactor  = (s == null) ? DEFAULT_MUSIC_RFACTOR : Integer.parseInt(s);
 
+        String split = info.getProperty(KEY_SPLIT);
+        this.splitAllowed = (split == null) ? true: Boolean.parseBoolean(split);
+        
         initializeMetricTables();
         commitExecutorThreads = Executors.newFixedThreadPool(4);
     }
@@ -1263,15 +1268,15 @@ public class MusicMixin implements MusicInterface {
         return lockId;
     }
 
-    protected void changeIsLatestToMRI(MusicRangeInformationRow row, boolean isLatest, LockResult lock) throws MDBCServiceException{
+    protected void changeIsLatestToMRI(UUID mrirow, boolean isLatest, String lockref) throws MDBCServiceException{
        
-        if(lock == null)
+        if(lockref == null)
             return;
-        PreparedQueryObject appendQuery = createChangeIsLatestToMriQuery(musicRangeInformationTableName, row.getPartitionIndex(),
+        PreparedQueryObject appendQuery = createChangeIsLatestToMriQuery(musicRangeInformationTableName, mrirow,
             musicTxDigestTableName, isLatest);
-        ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, row.getPartitionIndex().toString(),
+        ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, mrirow.toString(),
             appendQuery, 
-            lock.getLockId()
+            lockref
             , null);
         if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
             logger.error(EELFLoggerDelegate.errorLogger, "Error when executing change isLatest operation with return type: "+returnType.getMessage());
@@ -2140,7 +2145,10 @@ public class MusicMixin implements MusicInterface {
         List<MusicRangeInformationRow> returnInfo = new ArrayList<>();
         List<DagNode> toDisable = latestDag.getOldestDoubles();
         for(DagNode node : toDisable){
-            changeIsLatestToMRI(node.getRow(),false,locks.get(node.getId()));
+            LockResult lockToDisable = locks.get(node.getId());
+            if (lockToDisable!=null) {
+                changeIsLatestToMRI(node.getRow().getPartitionIndex(),false,lockToDisable.getLockId());
+            }
             latestDag.setIsLatest(node.getId(),false);
             returnInfo.add(node.getRow());
         }
@@ -2185,6 +2193,54 @@ public class MusicMixin implements MusicInterface {
         return new OwnershipReturn(ownershipId, createdRow.getDBPartition().getLockId(), createdRow.getPartitionIndex(),
                 createdRow.getDBPartition().getSnapshot(), currentlyOwned);
     }
+    
+    
+    @Override
+    public DatabasePartition splitPartitionIfNecessary(DatabasePartition partition, Set<Range> rangesUsed)
+            throws MDBCServiceException {
+        if (!this.splitAllowed) {
+            return partition;
+        }
+        Set<Range> rangesOwned = partition.getSnapshot();
+        if (rangesOwned==null || rangesUsed==null) {
+            return partition;
+        }
+        if (!rangesOwned.containsAll(rangesUsed)) {
+            throw new MDBCServiceException("Transaction was unable to acquire all necessary ranges.");
+        }
+
+        if (rangesUsed.containsAll(rangesOwned)) {
+            //using all ranges in this partition
+            return partition;
+        }
+
+        //split partition
+        logger.info(EELFLoggerDelegate.applicationLogger, "Full partition not being used need (" + rangesUsed
+                +") and own (" + rangesOwned + ", splitting the partition");
+        Set<UUID> prevPartitions = new HashSet<>();
+        prevPartitions.add(partition.getMRIIndex());
+        MusicRangeInformationRow usedRow = createAndAssignLock(rangesUsed, prevPartitions);
+        rangesOwned.removeAll(rangesUsed);
+        Set<Range> rangesNotUsed = rangesOwned;
+        MusicRangeInformationRow unusedRow = createAndAssignLock(rangesNotUsed, prevPartitions);
+
+        changeIsLatestToMRI(partition.getMRIIndex(), false, partition.getLockId());
+
+        Map<Range, Pair<MriReference, Integer>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied();
+        for (Range range: rangesUsed) {
+            alreadyApplied.put(range, Pair.of(new MriReference(usedRow.getPartitionIndex()), -1));
+        }
+        for (Range range: rangesNotUsed) {
+            alreadyApplied.put(range, Pair.of(new MriReference(unusedRow.getPartitionIndex()), -1));
+        }
+
+        //release/update old partition info
+        relinquish(unusedRow.getDBPartition());
+        relinquish(partition);
+
+        return usedRow.getDBPartition();
+    }
+    
 
     private MusicRangeInformationRow createAndAssignLock(Set<Range> ranges, Set<UUID> prevPartitions) throws MDBCServiceException {
         UUID newUUID = MDBCUtils.generateTimebasedUniqueKey();
index d4581d7..3af6f0f 100755 (executable)
@@ -185,7 +185,7 @@ public class MySQLMixin implements DBInterface {
         String dbname = "mdbc"; // default name
         try {
             Statement stmt = conn.createStatement();
-            ResultSet rs = stmt.executeQuery("SELECT DATABASE() AS DB");
+            ResultSet rs = stmt.executeQuery("SELECT UPPER(DATABASE()) AS DB");
             if (rs.next()) {
                 dbname = rs.getString("DB");
             }
@@ -212,7 +212,7 @@ public class MySQLMixin implements DBInterface {
     public Set<String> getSQLTableSet() {
         Set<String> set = new TreeSet<String>();
         String sql =
-                "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
+                "SELECT CONCAT(UPPER(TABLE_SCHEMA), '.', UPPER(TABLE_NAME)) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE';";
         try {
             Statement stmt = jdbcConn.createStatement();
             ResultSet rs = stmt.executeQuery(sql);
@@ -232,7 +232,7 @@ public class MySQLMixin implements DBInterface {
     public Set<Range> getSQLRangeSet() {
         Set<String> set = new TreeSet<String>();
         String sql =
-                "SELECT CONCAT(TABLE_SCHEMA, '.', TABLE_NAME) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE'";
+                "SELECT CONCAT(UPPER(TABLE_SCHEMA), '.', UPPER(TABLE_NAME)) as TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA=DATABASE() AND TABLE_TYPE='BASE TABLE';";
         try {
             Statement stmt = jdbcConn.createStatement();
             ResultSet rs = stmt.executeQuery(sql);
index 658d127..c95644b 100644 (file)
@@ -373,6 +373,12 @@ public class OwnershipAndCheckpoint{
                         false, partition.getSnapshot()));
             } else if ( ownershipLocks.containsKey(uuidToOwn) || !row.getIsLatest() ) {
                 toOwn.setOwn(node);
+                if (ownershipLocks.containsKey(uuidToOwn) && !row.getIsLatest()) {
+                    //previously owned partition that is no longer latest, don't need anymore
+                    LockResult result = ownershipLocks.get(uuidToOwn);
+                    ownershipLocks.remove(uuidToOwn);
+                    mi.relinquish(result.getLockId(), uuidToOwn.toString());
+                }
             } else {
                 LockRequest request = new LockRequest(uuidToOwn,
                         new ArrayList<>(node.getRangeSet()), lockType);
index 49fdfd2..4d69da6 100755 (executable)
@@ -15,4 +15,8 @@ DEFAULT_DRIVERS=\
         org.mariadb.jdbc.Driver \
         org.postgresql.Driver
 
-txdaemonsleeps=15
+# whether or not to split the partitions
+partition_splitting=true
+
+#time, in seconds, between when the daemon catches up
+txdaemonsleeps=15
\ No newline at end of file
index 72ec8d3..f7dd8ee 100644 (file)
@@ -68,7 +68,7 @@ public class MdbcTestUtils {
     final private static String nodeInfoTableName = "nodeinfo";
     //Mariadb variables
     static DB db=null;
-    final public static String mariaDBDatabaseName="test";
+    final public static String mariaDBDatabaseName="TEST";
     final static Integer mariaDbPort=13306;
 
 
index 1f2c1dd..bd493c7 100644 (file)
@@ -72,7 +72,7 @@ public class MySQLMixinTest {
        
        @Test
        public void testGetDataBaseName() throws SQLException {
-               Assert.assertEquals(MdbcTestUtils.getMariaDBDBName(), mysqlMixin.getDatabaseName());
+               Assert.assertEquals(MdbcTestUtils.getMariaDBDBName().toUpperCase(), mysqlMixin.getDatabaseName());
        }
 
 }