Insert/update interchange and PS issue 84/82984/2
authorstatta <statta@research.att.com>
Thu, 21 Mar 2019 21:57:57 +0000 (17:57 -0400)
committerTschaen, Brendan <ctschaen@att.com>
Fri, 22 Mar 2019 15:07:17 +0000 (11:07 -0400)
Issue-ID: MUSIC-276
Change-Id: I9c9b990cb362adae81e621db527a0f890e00c7eb
Signed-off-by: statta <statta@research.att.com>
mdbc-server/src/main/java/org/onap/music/mdbc/MdbcPreparedStatement.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/DBInterface.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MySQLMixin.java

index 1913811..102072c 100755 (executable)
@@ -25,7 +25,6 @@ import java.math.BigDecimal;
 import java.net.URL;
 import java.sql.Array;
 import java.sql.Blob;
-import java.sql.CallableStatement;
 import java.sql.Clob;
 import java.sql.Connection;
 import java.sql.Date;
@@ -43,9 +42,7 @@ import java.sql.Statement;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.util.Calendar;
-
 import org.apache.commons.lang3.StringUtils;
-
 import org.onap.music.exceptions.MDBCServiceException;
 import org.onap.music.logging.EELFLoggerDelegate;
 
@@ -673,117 +670,117 @@ public class MdbcPreparedStatement extends MdbcStatement implements PreparedStat
 
        @Override
        public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException {
-               ((CallableStatement)stmt).setTimestamp(parameterIndex, x, cal);
+               ((PreparedStatement)stmt).setTimestamp(parameterIndex, x, cal);
        }
 
        @Override
        public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException {
-               ((CallableStatement)stmt).setNull(parameterIndex, sqlType, typeName);
+               ((PreparedStatement)stmt).setNull(parameterIndex, sqlType, typeName);
        }
 
        @Override
        public void setURL(int parameterIndex, URL x) throws SQLException {
-               ((CallableStatement)stmt).setURL(parameterIndex, x);
+               ((PreparedStatement)stmt).setURL(parameterIndex, x);
        }
 
        @Override
        public ParameterMetaData getParameterMetaData() throws SQLException {
-               return ((CallableStatement)stmt).getParameterMetaData();
+               return ((PreparedStatement)stmt).getParameterMetaData();
        }
 
        @Override
        public void setRowId(int parameterIndex, RowId x) throws SQLException {
-               ((CallableStatement)stmt).setRowId(parameterIndex, x);
+               ((PreparedStatement)stmt).setRowId(parameterIndex, x);
        }
 
        @Override
        public void setNString(int parameterIndex, String value) throws SQLException {
-               ((CallableStatement)stmt).setNString(parameterIndex, value);
+               ((PreparedStatement)stmt).setNString(parameterIndex, value);
        }
 
        @Override
        public void setNCharacterStream(int parameterIndex, Reader value, long length) throws SQLException {
-               ((CallableStatement)stmt).setNCharacterStream(parameterIndex, value, length);
+               ((PreparedStatement)stmt).setNCharacterStream(parameterIndex, value, length);
        }
 
        @Override
        public void setNClob(int parameterIndex, NClob value) throws SQLException {
-               ((CallableStatement)stmt).setNClob(parameterIndex, value);
+               ((PreparedStatement)stmt).setNClob(parameterIndex, value);
        }
 
        @Override
        public void setClob(int parameterIndex, Reader reader, long length) throws SQLException {
-               ((CallableStatement)stmt).setClob(parameterIndex, reader, length);
+               ((PreparedStatement)stmt).setClob(parameterIndex, reader, length);
        }
 
        @Override
        public void setBlob(int parameterIndex, InputStream inputStream, long length) throws SQLException {
-               ((CallableStatement)stmt).setBlob(parameterIndex, inputStream, length);
+               ((PreparedStatement)stmt).setBlob(parameterIndex, inputStream, length);
        }
 
        @Override
        public void setNClob(int parameterIndex, Reader reader, long length) throws SQLException {
-               ((CallableStatement)stmt).setNClob(parameterIndex, reader, length);
+               ((PreparedStatement)stmt).setNClob(parameterIndex, reader, length);
        }
 
        @Override
        public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException {
-               ((CallableStatement)stmt).setSQLXML(parameterIndex, xmlObject);
+               ((PreparedStatement)stmt).setSQLXML(parameterIndex, xmlObject);
        }
 
        @Override
        public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException {
-               ((CallableStatement)stmt).setObject(parameterIndex, x, targetSqlType, scaleOrLength);
+               ((PreparedStatement)stmt).setObject(parameterIndex, x, targetSqlType, scaleOrLength);
        }
 
        @Override
        public void setAsciiStream(int parameterIndex, InputStream x, long length) throws SQLException {
-               ((CallableStatement)stmt).setAsciiStream(parameterIndex, x, length);
+               ((PreparedStatement)stmt).setAsciiStream(parameterIndex, x, length);
        }
 
        @Override
        public void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException {
-               ((CallableStatement)stmt).setBinaryStream(parameterIndex, x, length);
+               ((PreparedStatement)stmt).setBinaryStream(parameterIndex, x, length);
        }
 
        @Override
        public void setCharacterStream(int parameterIndex, Reader reader, long length) throws SQLException {
-               ((CallableStatement)stmt).setCharacterStream(parameterIndex, reader, length);
+               ((PreparedStatement)stmt).setCharacterStream(parameterIndex, reader, length);
        }
 
        @Override
        public void setAsciiStream(int parameterIndex, InputStream x) throws SQLException {
-               ((CallableStatement)stmt).setAsciiStream(parameterIndex, x);
+               ((PreparedStatement)stmt).setAsciiStream(parameterIndex, x);
        }
 
        @Override
        public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException {
-               ((CallableStatement)stmt).setBinaryStream(parameterIndex, x);
+               ((PreparedStatement)stmt).setBinaryStream(parameterIndex, x);
        }
 
        @Override
        public void setCharacterStream(int parameterIndex, Reader reader) throws SQLException {
-               ((CallableStatement)stmt).setCharacterStream(parameterIndex, reader);
+               ((PreparedStatement)stmt).setCharacterStream(parameterIndex, reader);
        }
 
        @Override
        public void setNCharacterStream(int parameterIndex, Reader value) throws SQLException {
-               ((CallableStatement)stmt).setNCharacterStream(parameterIndex, value);
+               ((PreparedStatement)stmt).setNCharacterStream(parameterIndex, value);
        }
 
        @Override
        public void setClob(int parameterIndex, Reader reader) throws SQLException {
-               ((CallableStatement)stmt).setClob(parameterIndex, reader);
+               ((PreparedStatement)stmt).setClob(parameterIndex, reader);
        }
 
        @Override
        public void setBlob(int parameterIndex, InputStream inputStream) throws SQLException {
-               ((CallableStatement)stmt).setBlob(parameterIndex, inputStream);
+               ((PreparedStatement)stmt).setBlob(parameterIndex, inputStream);
        }
 
        @Override
        public void setNClob(int parameterIndex, Reader reader) throws SQLException {
-               ((CallableStatement)stmt).setNClob(parameterIndex, reader);
+               ((PreparedStatement)stmt).setNClob(parameterIndex, reader);
        }
        
 }
index 85645f3..5ef2bc7 100755 (executable)
@@ -25,7 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
+import org.onap.music.exceptions.MDBCServiceException;
 import org.onap.music.mdbc.Range;
 import org.onap.music.mdbc.TableInfo;
 import org.onap.music.mdbc.tables.StagingTable;
@@ -115,12 +115,13 @@ public interface DBInterface {
         * Replay a given TxDigest into the local DB
         * @param digest
         * @throws SQLException if replay cannot occur correctly
+        * @throws MDBCServiceException 
         */
-       void replayTransaction(StagingTable digest, List<Range> ranges) throws SQLException;
+       void replayTransaction(StagingTable digest, List<Range> ranges) throws SQLException, MDBCServiceException;
 
        void disableForeignKeyChecks() throws SQLException;
 
        void enableForeignKeyChecks() throws SQLException;
 
-       void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException;
+       void applyTxDigest(StagingTable txDigest, List<Range> ranges) throws SQLException, MDBCServiceException;
 }
index ffb6c87..c6cc512 100644 (file)
@@ -1300,10 +1300,15 @@ public class MusicMixin implements MusicInterface {
     }
 
     protected void changeIsLatestToMRI(MusicRangeInformationRow row, boolean isLatest, LockResult lock) throws MDBCServiceException{
+       
+        if(lock == null)
+            return;
         PreparedQueryObject appendQuery = createChangeIsLatestToMriQuery(musicRangeInformationTableName, row.getPartitionIndex(),
             musicTxDigestTableName, isLatest);
         ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, row.getPartitionIndex().toString(),
-            appendQuery, lock.getOwnerId(), null);
+            appendQuery, 
+            lock.getOwnerId()
+            , null);
         if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
             logger.error(EELFLoggerDelegate.errorLogger, "Error when executing change isLatest operation with return type: "+returnType.getMessage());
             throw new MDBCServiceException("Error when executing change isLatest operation with return type: "+returnType.getMessage());
@@ -2057,8 +2062,11 @@ public class MusicMixin implements MusicInterface {
     }
 
     private void releaseLocks(List<MusicRangeInformationRow> changed, Map<UUID,LockResult> newLocks) throws MDBCServiceException{
+        
         for(MusicRangeInformationRow r : changed) {
             LockResult lock = newLocks.get(r.getPartitionIndex());
+            if(lock == null)
+                continue;
             unlockKeyInMusic(musicRangeInformationTableName, r.getPartitionIndex().toString(),
                 lock.getOwnerId());
             newLocks.remove(r.getPartitionIndex());
index 420f9d4..928ffa1 100755 (executable)
@@ -792,8 +792,9 @@ NEW.field refers to the new value
        /**
         * Parse the transaction digest into individual events
         * @param transaction - base 64 encoded, serialized digest
+        * @throws MDBCServiceException 
         */
-       public void replayTransaction(StagingTable transaction, List<Range> ranges) throws SQLException {
+       public void replayTransaction(StagingTable transaction, List<Range> ranges) throws SQLException, MDBCServiceException {
                boolean autocommit = jdbcConn.getAutoCommit();
                jdbcConn.setAutoCommit(false);
                Statement jdbcStmt = jdbcConn.createStatement();
@@ -803,7 +804,7 @@ NEW.field refers to the new value
                        if(Range.overlaps(ranges,op.getTable())) {
                                try {
                                        replayOperationIntoDB(jdbcStmt, op);
-                               } catch (SQLException e) {
+                               } catch (SQLException | MDBCServiceException e) {
                                        //rollback transaction
                                        logger.error("Unable to replay: " + op.getOperationType() + "->" + op.getVal() + "."
                                                + "Rolling back the entire digest replay.");
@@ -835,87 +836,148 @@ NEW.field refers to the new value
        }
 
        @Override
-       public void applyTxDigest(StagingTable txDigest,List<Range> ranges) throws SQLException {
+       public void applyTxDigest(StagingTable txDigest,List<Range> ranges) throws SQLException, MDBCServiceException {
                replayTransaction(txDigest,ranges);
        }
 
        /**
-        * Replays operation into database, usually from txDigest
-        * @param jdbcStmt
-        * @param r
-        * @param op
-        * @throws SQLException 
-        */
-       private void replayOperationIntoDB(Statement jdbcStmt, Operation op) throws SQLException {
-               logger.info("Replaying Operation: " + op.getOperationType() + "->" + op.getVal());
-               JSONObject jsonOp = op.getVal();
-
-               ArrayList<String> cols = new ArrayList<String>();
-               ArrayList<Object> vals = new ArrayList<Object>();
-               Iterator<String> colIterator = jsonOp.keys();
-               while(colIterator.hasNext()) {
-                       String col = colIterator.next();
-                       //FIXME: should not explicitly refer to cassandramixin
-                       if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) {
-                               //reserved name
-                               continue;
-                       }
-                       cols.add(col);
-                       vals.add(jsonOp.get(col));
-               }
-               
-               //build the queries
-               StringBuilder sql = new StringBuilder();
-               String sep = "";
-               switch (op.getOperationType()) {
-               case INSERT:
-                       sql.append(op.getOperationType() + " INTO ");
-                       sql.append(op.getTable() + " (") ;
-                       sep = "";
-                       for (String col: cols) {
-                               sql.append(sep + col);
-                               sep = ", ";
-                       }       
-                       sql.append(") VALUES (");
-                       sep = "";
-                       for (Object val: vals) {
-                               sql.append(sep + "\"" + val + "\"");
-                               sep = ", ";
-                       }
-                       sql.append(");");
-                       break;
-               case UPDATE:
-                       sql.append(op.getOperationType() + " ");
-                       sql.append(op.getTable() + " SET ");
-                       sep="";
-                       for (int i=0; i<cols.size(); i++) {
-                               sql.append(sep + cols.get(i) + "=\"" + vals.get(i) +"\"");
-                               sep = ", ";
-                       }
-                       sql.append(" WHERE ");
-                       try {
-                               sql.append(getPrimaryKeyConditional(op.getKey()));
-                       } catch (MDBCServiceException e) {
-                           throw new SQLException("Update operatoin doesn't contain the required primary key",e);
-                       }
-                       sql.append(";");
-                       break;
-               case DELETE:
-                       sql.append(op.getOperationType() + " FROM ");
-                       sql.append(op.getTable() + " WHERE ");
-                       sql.append(getPrimaryKeyConditional(op.getVal()));
-                       sql.append(";");
-                       break;
-               case SELECT:
-                       //no update happened, do nothing
-                       return;
-               default:
-                       logger.error(op.getOperationType() + "not implemented for replay");
-               }
-               logger.info("Replaying operation: " + sql.toString());
-               
-               jdbcStmt.executeQuery(sql.toString());
-       }
+     * Replays operation into database, usually from txDigest
+     * @param jdbcStmt
+     * @param r
+     * @param op
+     * @throws SQLException 
+        * @throws MDBCServiceException 
+     */
+    private void replayOperationIntoDB(Statement jdbcStmt, Operation op) throws SQLException, MDBCServiceException {
+        logger.info("Replaying Operation: " + op.getOperationType() + "->" + op.getVal());
+        JSONObject jsonOp = op.getVal();
+        
+        ArrayList<String> cols = new ArrayList<String>();
+        ArrayList<Object> vals = new ArrayList<Object>();
+        Iterator<String> colIterator = jsonOp.keys();
+        while(colIterator.hasNext()) {
+            String col = colIterator.next();
+            //FIXME: should not explicitly refer to cassandramixin
+            if (col.equals(MusicMixin.MDBC_PRIMARYKEY_NAME)) {
+                //reserved name
+                continue;
+            }
+            cols.add(col);
+            vals.add(jsonOp.get(col));
+        }
+        
+        //build and replay the queries
+        StringBuilder sql = constructSQL(op, cols, vals);
+        if(sql == null)
+            return;
+        
+        try {
+            logger.info("Replaying operation: " + sql.toString());
+            int updated = jdbcStmt.executeUpdate(sql.toString());
+            
+            if(updated == 0) {
+                // This applies only for replaying transactions involving Eventually Consistent tables
+                logger.warn("Error Replaying operation: " + sql.toString() + "; Replacing insert/replace/viceversa and replaying ");
+                
+                buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals);
+            }
+        } catch (SQLException sqlE) {
+            // This applies only for replaying transactions involving Eventually Consistent tables
+            logger.warn("Error Replaying operation: " + sql.toString() + "; Replacing insert/replace/viceversa and replaying ");
+            
+            buildAndExecuteSQLInverse(jdbcStmt, op, cols, vals);       
+            
+        }
+    }
+    protected void buildAndExecuteSQLInverse(Statement jdbcStmt, Operation op,
+            ArrayList<String> cols, ArrayList<Object> vals) throws SQLException, MDBCServiceException {
+        StringBuilder sqlInverse = constructSQLInverse( op, cols, vals);
+        if(sqlInverse == null)
+            return;
+        logger.info("Replaying operation: " + sqlInverse.toString());       
+        jdbcStmt.executeUpdate(sqlInverse.toString());
+    }
+    protected StringBuilder constructSQLInverse(Operation op, ArrayList<String> cols,
+            ArrayList<Object> vals) throws MDBCServiceException {
+        StringBuilder sqlInverse = null;
+        switch (op.getOperationType()) {
+            case INSERT:
+                sqlInverse = constructUpdate(op.getTable() , OperationType.UPDATE, op.getKey(), cols, vals);
+                break;
+            case UPDATE:
+                sqlInverse = constructInsert(op.getTable() , OperationType.INSERT, cols, vals);
+                break;
+            default:
+                break;
+        }
+        return sqlInverse;
+    }
+    protected StringBuilder constructSQL(Operation op, ArrayList<String> cols,
+            ArrayList<Object> vals) throws MDBCServiceException {
+        StringBuilder sql = null;
+        switch (op.getOperationType()) {
+        case INSERT:
+            sql = constructInsert(op.getTable(), op.getOperationType(), cols, vals);
+            break;
+        case UPDATE:
+            sql = constructUpdate(op.getTable(), op.getOperationType(), op.getKey(), cols, vals);
+            break;
+        case DELETE:
+            sql = constructDelete(op.getTable(), op.getOperationType(), op.getKey());
+            break;
+        case SELECT:
+            //no update happened, do nothing
+            break;
+        default:
+            logger.error(op.getOperationType() + "not implemented for replay");
+        }
+        return sql;
+    }
+    private StringBuilder constructDelete(String r, OperationType op, JSONObject key) {
+        StringBuilder sql = new StringBuilder();
+        sql.append(op + " FROM ");
+        sql.append(r  + " WHERE ");
+        sql.append(getPrimaryKeyConditional(key));
+        sql.append(";");
+        return sql;
+    }
+    private StringBuilder constructInsert(String r, OperationType  op, ArrayList<String> cols,
+            ArrayList<Object> vals) {
+        StringBuilder sql = new StringBuilder();
+        String sep;
+        sql.append(op + " INTO ");
+        sql.append(r + " (") ;
+        sep = "";
+        for (String col: cols) {
+            sql.append(sep + col);
+            sep = ", ";
+        }   
+        sql.append(") VALUES (");
+        sep = "";
+        for (Object val: vals) {
+            sql.append(sep + "\"" + val + "\"");
+            sep = ", ";
+        }
+        sql.append(");");
+        return sql;
+    }
+    private StringBuilder constructUpdate(String r, OperationType op, JSONObject key, ArrayList<String> cols,
+            ArrayList<Object> vals) {
+        StringBuilder sql = new StringBuilder();
+        String sep;
+        sql.append(op + " ");
+        sql.append(r + " SET ");
+        sep="";
+        for (int i=0; i<cols.size(); i++) {
+            sql.append(sep + cols.get(i) + "=\"" + vals.get(i) +"\"");
+            sep = ", ";
+        }
+        sql.append(" WHERE ");
+        sql.append(getPrimaryKeyConditional(key));
+        sql.append(";");
+        
+        return sql;
+    }
        
        /**
         * Create an SQL string for AND'ing all of the primary keys