Remove ownership logic from mixin 42/79742/1
authorTschaen, Brendan <ctschaen@att.com>
Tue, 5 Mar 2019 22:46:20 +0000 (17:46 -0500)
committerTschaen, Brendan <ctschaen@att.com>
Tue, 5 Mar 2019 22:46:20 +0000 (17:46 -0500)
Change-Id: I70d62e76e23c690726294c62b9222c4cd9659c70
Issue-ID: MUSIC-326
Signed-off-by: Tschaen, Brendan <ctschaen@att.com>
12 files changed:
mdbc-server/src/main/java/org/onap/music/mdbc/MdbcConnection.java
mdbc-server/src/main/java/org/onap/music/mdbc/StateManager.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockRequest.java [new file with mode: 0644]
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockResult.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MixinFactory.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/Music2Mixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicInterface.java
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
mdbc-server/src/main/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpoint.java
mdbc-server/src/main/java/org/onap/music/mdbc/tables/MusicTxDigest.java
mdbc-server/src/test/java/org/onap/music/mdbc/mixins/MusicMixinTest.java
mdbc-server/src/test/java/org/onap/music/mdbc/ownership/OwnershipAndCheckpointTest.java

index 0793a67..7377c4f 100755 (executable)
@@ -507,7 +507,7 @@ public class MdbcConnection implements Connection {
         DatabasePartition tempPartition = own(scQueryTables);
         if(tempPartition!=null && tempPartition != partition) {
             this.partition.updateDatabasePartition(tempPartition);
-            mi.reloadAlreadyApplied(this.partition);
+            statemanager.getOwnAndCheck().reloadAlreadyApplied(this.partition);
         }
       dbi.preStatementHook(sql);
     }
@@ -575,10 +575,10 @@ public class MdbcConnection implements Connection {
             return null;
         }
         DatabasePartition newPartition = null;
-        OwnershipAndCheckpoint ownAndCheck = mi.getOwnAndCheck();
+        OwnershipAndCheckpoint ownAndCheck = statemanager.getOwnAndCheck();
         UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey();
         try {
-            final OwnershipReturn ownershipReturn = mi.own(ranges, partition, ownOpId);
+            final OwnershipReturn ownershipReturn = ownAndCheck.own(mi, ranges, partition, ownOpId);
             if(ownershipReturn==null){
                 return null;
             }
index 6cc50ec..8e7976f 100644 (file)
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.tuple.Pair;
 import org.onap.music.exceptions.MDBCServiceException;
 import org.onap.music.logging.EELFLoggerDelegate;
 import org.onap.music.logging.format.AppMessages;
@@ -33,6 +34,8 @@ import org.onap.music.mdbc.mixins.DBInterface;
 import org.onap.music.mdbc.mixins.MixinFactory;
 import org.onap.music.mdbc.mixins.MusicInterface;
 import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
+import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
+import org.onap.music.mdbc.tables.MriReference;
 import org.onap.music.mdbc.tables.MusicTxDigest;
 import org.onap.music.mdbc.tables.TxCommitProgress;
 
@@ -79,6 +82,11 @@ public class StateManager {
     String cassandraUrl;
     private Properties info;
     
+    /**  The property name to use to provide a timeout to mdbc (ownership) */
+    public static final String KEY_TIMEOUT = "mdbc_timeout";
+    /** The default property value to use for the MDBC timeout */
+    public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours
+    
     /** Identifier for this server instance */
     private String mdbcServerName;
     private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition
@@ -86,6 +94,8 @@ public class StateManager {
     private List<Range> eventualRanges;
     private final Lock warmupLock = new ReentrantLock();
     private List<Range> warmupRanges;
+    private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
+    private OwnershipAndCheckpoint ownAndCheck;
 
        public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
         this.sqlDBName = sqlDBName;
@@ -108,6 +118,10 @@ public class StateManager {
         initMusic();
         initSqlDatabase(); 
 
+        String t = info.getProperty(KEY_TIMEOUT);
+        long timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t);
+        alreadyApplied = new ConcurrentHashMap<>();
+        ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied, timeout);
         
         MusicTxDigest txDaemon = new MusicTxDigest(this);
         txDaemon.startBackgroundDaemon(Integer.parseInt(
@@ -119,7 +133,7 @@ public class StateManager {
      * @throws MDBCServiceException
      */
     protected void initMusic() throws MDBCServiceException {
-        this.musicInterface = MixinFactory.createMusicInterface(musicmixin, mdbcServerName, info);
+        this.musicInterface = MixinFactory.createMusicInterface(this, musicmixin, mdbcServerName, info);
         this.mdbcConnections = new HashMap<>();
     }
     
@@ -326,4 +340,8 @@ public class StateManager {
             warmupLock.unlock();
         }
     }
+    
+    public OwnershipAndCheckpoint getOwnAndCheck() {
+        return ownAndCheck;
+    }
 }
diff --git a/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockRequest.java b/mdbc-server/src/main/java/org/onap/music/mdbc/mixins/LockRequest.java
new file mode 100644 (file)
index 0000000..e8b400b
--- /dev/null
@@ -0,0 +1,69 @@
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc.mixins;
+
+import java.util.List;
+import java.util.UUID;
+import org.onap.music.mdbc.Range;
+
+public class LockRequest {
+    private final String table;
+    private final UUID id;
+    private final List<Range> toLockRanges;
+    private int numOfAttempts;
+
+    /**
+     * 
+     * @param table
+     * @param id
+     * @param toLockRanges
+     */
+    public LockRequest(String table, UUID id, List<Range> toLockRanges) {
+        this.table = table;
+        this.id = id;
+        this.toLockRanges = toLockRanges;
+        numOfAttempts = 1;
+    }
+
+    public UUID getId() {
+        return id;
+    }
+
+    public List<Range> getToLockRanges() {
+        return toLockRanges;
+    }
+
+    public String getTable() {
+        return table;
+    }
+    
+    /**
+     * Number of times you've requested this lock
+     * @return
+     */
+    public int getNumOfAttempts() {
+        return numOfAttempts;
+    }
+
+    public void incrementAttempts() {
+        numOfAttempts++;        
+    }
+}
\ No newline at end of file
index 7dd92c4..8055cae 100644 (file)
@@ -25,24 +25,68 @@ import java.util.UUID;
 import org.onap.music.mdbc.Range;
 
 public class LockResult{
-    private final UUID musicRangeInformationIndex;
-    private final String ownerId;
+    private boolean successful;
+    private UUID musicRangeInformationIndex;
+    private String ownerId;
     private List<Range> ranges;
-    private final boolean newLock;
+    private boolean newLock;
+    private long backOffPeriodS;
 
+    public LockResult(boolean succesful, UUID rowId, String ownerId, boolean newLock, List<Range> ranges){
+        this.successful = true;
+        this.musicRangeInformationIndex = rowId;
+        this.ownerId=ownerId;
+        this.newLock=newLock;
+        this.ranges=ranges;
+    }
+    /**
+     * Please use constructor which specifies whether lock result was succesful
+     * @param rowId
+     * @param ownerId
+     * @param newLock
+     * @param ranges
+     */
+    @Deprecated
     public LockResult(UUID rowId, String ownerId, boolean newLock, List<Range> ranges){
+        this.successful = true;
         this.musicRangeInformationIndex = rowId;
         this.ownerId=ownerId;
         this.newLock=newLock;
         this.ranges=ranges;
     }
+    public LockResult(boolean successful, long backOffTimems) {
+        this.successful = successful;
+        this.backOffPeriodS = backOffTimems;
+    }
+    
+    public boolean wasSuccessful() {
+        return successful;
+    }
+    
     public String getOwnerId(){
         return ownerId;
     }
     public boolean isNewLock(){
         return newLock;
     }
-    public UUID getIndex() {return musicRangeInformationIndex;}
-    public List<Range> getRanges() {return ranges;}
-    public void addRange(Range range){ranges.add(range);}
+    public UUID getIndex() {
+        return musicRangeInformationIndex;
+    }
+    
+    public List<Range> getRanges() {
+        return ranges;
+    }
+    
+    public void addRange(Range range) {
+        ranges.add(range);
+    }
+    
+    /**
+     * Get the backOffPeriod, in milliseconds, requested by mixin
+     * @return
+     */
+    public long getBackOffPeriod() {
+        return this.backOffPeriodS;
+    }
+    
 }
\ No newline at end of file
index 1edb38d..d822615 100755 (executable)
@@ -25,6 +25,7 @@ import java.sql.Connection;
 import java.util.Properties;
 
 import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.StateManager;
 
 /**
  * This class is used to construct instances of Mixins that implement either the {@link org.onap.music.mdbc.mixins.DBInterface}
@@ -78,7 +79,7 @@ public class MixinFactory {
         * @param info the Properties to use as an argument to the constructor
         * @return the newly constructed MusicInterface, or null if one cannot be found.
         */
-       public static MusicInterface createMusicInterface(String name, String mdbcServerName, Properties info) {
+       public static MusicInterface createMusicInterface(StateManager statemanager, String name, String mdbcServerName, Properties info) {
                for (Class<?> cl : Utils.getClassesImplementing(MusicInterface.class)) {
                        try {
                                Constructor<?> con = cl.getConstructor();
@@ -87,10 +88,10 @@ public class MixinFactory {
                                        String miname = mi.getMixinName();
                                        logger.info(EELFLoggerDelegate.applicationLogger, "Checking "+miname);
                                        if (miname.equalsIgnoreCase(name)) {
-                                               con = cl.getConstructor(String.class, Properties.class);
+                                               con = cl.getConstructor(StateManager.class, String.class, Properties.class);
                                                if (con != null) {
                                                        logger.info(EELFLoggerDelegate.applicationLogger,"Found match: "+miname);
-                                                       return (MusicInterface) con.newInstance(mdbcServerName, info);
+                                                       return (MusicInterface) con.newInstance(statemanager, mdbcServerName, info);
                                                }
                                        }
                                }
index 8181159..591f830 100755 (executable)
@@ -37,6 +37,7 @@ import org.onap.music.main.ReturnType;
 
 import org.onap.music.logging.EELFLoggerDelegate;
 import org.onap.music.mdbc.DatabasePartition;
+import org.onap.music.mdbc.StateManager;
 import org.onap.music.mdbc.TableInfo;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
@@ -58,8 +59,8 @@ public class Music2Mixin extends MusicMixin {
                super();
        }
 
-       public Music2Mixin(String url, Properties info) throws MDBCServiceException {
-               super(url, info);
+       public Music2Mixin(StateManager stateManager, String url, Properties info) throws MDBCServiceException {
+               super(stateManager, url, info);
        }
 
        /**
index c38efb7..00f6d00 100755 (executable)
@@ -211,6 +211,8 @@ public interface MusicInterface {
      */
        RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException;
 
+    List<Range> getRangeDependencies(List<Range> range) throws MDBCServiceException;
+       
        /**
      * This function is used to create a new row in the MRI table
      * @param info the information used to create the row
@@ -268,17 +270,6 @@ public interface MusicInterface {
      */
        StagingTable getTxDigest(MusicTxDigestId id) throws MDBCServiceException;
 
-    /**
-     * Use this functions to verify ownership, and own new ranges
-     * @param ranges the ranges that should be own after calling this function
-     * @param partition current information of the ownership in the system
-     * @param ownOpId is the id used to describe this ownership operation (it is not used to create the new row, if any is
-     *                required
-        * @return an object indicating the status of the own function result
-     * @throws MDBCServiceException
-     */
-       OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID ownOpId) throws MDBCServiceException;
-
     /**
      * This function relinquish ownership, if it is time to do it, it should be used at the end of a commit operation
      * @param partition information of the partition that is currently being owned
@@ -326,11 +317,11 @@ public interface MusicInterface {
     void deleteOldMriRows(Map<UUID,String> oldRowsAndLocks) throws MDBCServiceException;
 
     List<MusicRangeInformationRow> getAllMriRows() throws MDBCServiceException;
-
-    OwnershipAndCheckpoint getOwnAndCheck();
-
-    void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException;
     
     public void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException;
+    public LockResult requestLock(LockRequest request) throws MDBCServiceException;
+    public void releaseLocks(Map<UUID, LockResult> newLocks) throws MDBCServiceException;
+    public OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges,
+            Map<UUID, LockResult> locks, UUID ownershipId) throws MDBCServiceException;
 }
 
index 8a2ef6f..ffb6c87 100644 (file)
@@ -56,6 +56,7 @@ import org.onap.music.main.ReturnType;
 import org.onap.music.mdbc.DatabasePartition;
 import org.onap.music.mdbc.MDBCUtils;
 import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.StateManager;
 import org.onap.music.mdbc.TableInfo;
 import org.onap.music.mdbc.ownership.Dag;
 import org.onap.music.mdbc.ownership.DagNode;
@@ -105,16 +106,12 @@ public class MusicMixin implements MusicInterface {
     public static final String KEY_MUSIC_RFACTOR      = "music_rfactor";
     /** The property name to use to provide the replication factor for Cassandra. */
     public static final String KEY_MUSIC_NAMESPACE = "music_namespace";
-    /**  The property name to use to provide a timeout to mdbc (ownership) */
-    public static final String KEY_TIMEOUT = "mdbc_timeout";
     /** Namespace for the tables in MUSIC (Cassandra) */
     public static final String DEFAULT_MUSIC_NAMESPACE = "namespace";
     /** The default property value to use for the Cassandra IP address. */
     public static final String DEFAULT_MUSIC_ADDRESS  = "localhost";
     /** The default property value to use for the Cassandra replication factor. */
     public static final int    DEFAULT_MUSIC_RFACTOR  = 1;
-    /** The default property value to use for the MDBC timeout */
-    public static final long DEFAULT_TIMEOUT = 5*60*60*1000;//default of 5 hours
     /** The default primary string column, if none is provided. */
     public static final String MDBC_PRIMARYKEY_NAME = "mdbc_cuid";
     /** Type of the primary key, if none is defined by the user */
@@ -124,7 +121,7 @@ public class MusicMixin implements MusicInterface {
     //\TODO Add logic to change the names when required and create the tables when necessary
     private String musicTxDigestTableName = "musictxdigest";
     private String musicEventualTxDigestTableName = "musicevetxdigest";
-    private String musicRangeInformationTableName = "musicrangeinformation";
+    public static final String musicRangeInformationTableName = "musicrangeinformation";
     private String musicRangeDependencyTableName = "musicrangedependency";
     private String musicNodeInfoTableName = "nodeinfo";
 
@@ -157,27 +154,6 @@ public class MusicMixin implements MusicInterface {
         }
     }
 
-    private class LockRequest{
-        private final String table;
-        private final UUID id;
-        private final List<Range> toLockRanges;
-        public LockRequest(String table, UUID id, List<Range> toLockRanges){
-            this.table=table;
-            this.id=id;
-            this.toLockRanges=toLockRanges;
-        }
-        public UUID getId() {
-            return id;
-        }
-        public List<Range> getToLockRanges() {
-            return toLockRanges;
-        }
-
-        public String getTable() {
-            return table;
-        }
-    }
-
 
     private static final Map<Integer, String> typemap         = new HashMap<>();
     static {
@@ -207,14 +183,13 @@ public class MusicMixin implements MusicInterface {
     protected final String[] allReplicaIds;
     private final String musicAddress;
     private final int    music_rfactor;
-    protected long timeout;
     private MusicConnector mCon        = null;
     private Session musicSession       = null;
     private boolean keyspace_created   = false;
     private Map<String, PreparedStatement> ps_cache = new HashMap<>();
     private Set<String> in_progress    = Collections.synchronizedSet(new HashSet<String>());
-    private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
-    private OwnershipAndCheckpoint ownAndCheck;
+    private StateManager stateManager;
+    
 
     public MusicMixin() {
 
@@ -226,7 +201,7 @@ public class MusicMixin implements MusicInterface {
         this.allReplicaIds  = null;
     }
 
-    public MusicMixin(String mdbcServerName, Properties info) throws MDBCServiceException {
+    public MusicMixin(StateManager stateManager, String mdbcServerName, Properties info) throws MDBCServiceException {
         // Default values -- should be overridden in the Properties
         // Default to using the host_ids of the various peers as the replica IDs (this is probably preferred)
         this.musicAddress   = info.getProperty(KEY_MUSIC_ADDRESS, DEFAULT_MUSIC_ADDRESS);
@@ -245,12 +220,8 @@ public class MusicMixin implements MusicInterface {
         this.music_ns       = info.getProperty(KEY_MUSIC_NAMESPACE,DEFAULT_MUSIC_NAMESPACE);
         logger.info(EELFLoggerDelegate.applicationLogger,"MusicSqlManager: music_ns="+music_ns);
 
-        String t = info.getProperty(KEY_TIMEOUT);
-        this.timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t);
-
-        alreadyApplied = new ConcurrentHashMap<>();
-        ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied,timeout);
-
+        this.stateManager = stateManager;
+        
         initializeMetricTables();
     }
 
@@ -1354,7 +1325,7 @@ public class MusicMixin implements MusicInterface {
      */
     @Override
     public void commitLog(DatabasePartition partition,List<Range> eventualRanges,  StagingTable transactionDigest,
-                          String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
+                          String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException {
         // first deal with commit for eventually consistent tables
         filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
         
@@ -1374,8 +1345,7 @@ public class MusicMixin implements MusicInterface {
         //0. See if reference to lock was already created
         String lockId = partition.getLockId();
         if(mriIndex==null || lockId == null || lockId.isEmpty()) {
-            //\TODO fix this
-            own(partition.getSnapshot(),partition, MDBCUtils.generateTimebasedUniqueKey());
+            throw new MDBCServiceException("Not able to commit, as you are no longer the lock-holder for this partition");
         }
 
         UUID commitId;
@@ -1407,6 +1377,7 @@ public class MusicMixin implements MusicInterface {
             appendToRedoLog(partition, digestId);
             List<Range> ranges = partition.getSnapshot();
             for(Range r : ranges) {
+                Map<Range, Pair<MriReference, Integer>> alreadyApplied = stateManager.getOwnAndCheck().getAlreadyApplied();
                 if(!alreadyApplied.containsKey(r)){
                     throw new MDBCServiceException("already applied data structure was not updated correctly and range "
                     +r+" is not contained");
@@ -1420,12 +1391,7 @@ public class MusicMixin implements MusicInterface {
                 alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), rowAndIndex.getValue()+1));
             }
         }
-    }
-
-    public void cleanAlreadyApplied(){
-        logger.warn("Use this only in test environments");
-        alreadyApplied.clear();
-    }
+    }    
 
     private void filterAndAddEventualTxDigest(List<Range> eventualRanges,
             StagingTable transactionDigest, String txId,
@@ -1585,7 +1551,7 @@ public class MusicMixin implements MusicInterface {
     }
 
     @Override
-    public RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException{
+    public RangeDependency getMusicRangeDependency(Range baseRange) throws MDBCServiceException {
         String cql = String.format("SELECT * FROM %s.%s WHERE range = ?;", music_ns, musicRangeDependencyTableName);
         PreparedQueryObject pQueryObject = new PreparedQueryObject();
         pQueryObject.appendQueryString(cql);
@@ -1964,15 +1930,7 @@ public class MusicMixin implements MusicInterface {
         return ecDigestInformation;
     }
 
-    @Override
-    public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException {
-        List<Range> snapshot = partition.getSnapshot();
-        UUID row = partition.getMRIIndex();
-        for(Range r : snapshot){
-            alreadyApplied.put(r,Pair.of(new MriReference(row),-1));
-        }
-
-    }
+    
 
 
     ResultSet getAllMriCassandraRows() throws MDBCServiceException {
@@ -2076,13 +2034,13 @@ public class MusicMixin implements MusicInterface {
 
     private List<Range> lockRow(LockRequest request, DatabasePartition partition,Map<UUID, LockResult> rowLock)
         throws MDBCServiceException {
-        if(partition.getMRIIndex().equals(request.id) && partition.isLocked()){
+        if(partition.getMRIIndex().equals(request.getId()) && partition.isLocked()){
             return new ArrayList<>();
         }
         //\TODO: this function needs to be improved, to track possible changes in the owner of a set of ranges
-        String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+request.id.toString();
+        String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+request.getId().toString();
         //return List<Range> knownRanges, UUID mriIndex, String lockId
-        DatabasePartition newPartition = new DatabasePartition(request.toLockRanges,request.id,null);
+        DatabasePartition newPartition = new DatabasePartition(request.getToLockRanges(),request.getId(),null);
         return waitForLock(request,newPartition,rowLock);
     }
 
@@ -2091,7 +2049,8 @@ public class MusicMixin implements MusicInterface {
         MusicCore.destroyLockRef(fullyQualifiedKey,lockref);
     }
 
-    private void releaseLocks(Map<UUID,LockResult> newLocks) throws MDBCServiceException{
+    @Override
+    public void releaseLocks(Map<UUID,LockResult> newLocks) throws MDBCServiceException{
         for(Map.Entry<UUID,LockResult> lock : newLocks.entrySet()) {
             unlockKeyInMusic(musicRangeInformationTableName, lock.getKey().toString(), lock.getValue().getOwnerId());
         }
@@ -2126,7 +2085,8 @@ public class MusicMixin implements MusicInterface {
      * @return
      * @throws MDBCServiceException
      */
-    private List<Range> getRangeDependencies(List<Range> range) throws MDBCServiceException{
+    @Override
+    public List<Range> getRangeDependencies(List<Range> range) throws MDBCServiceException{
         Set<Range> extendedRange = new HashSet<>();
         for(Range r: range){
             extendedRange.add(r);
@@ -2138,30 +2098,22 @@ public class MusicMixin implements MusicInterface {
         return new ArrayList<>(extendedRange);
     }
 
-    private LockResult waitForLock(LockRequest request) throws MDBCServiceException{
+    @Override
+    public LockResult requestLock(LockRequest request) throws MDBCServiceException{
         String fullyQualifiedKey= music_ns+"."+ request.getTable()+"."+request.getId();
         String lockId = MusicCore.createLockReference(fullyQualifiedKey);
         ReturnType lockReturn = acquireLock(fullyQualifiedKey,lockId);
-        if(lockReturn.getResult().compareTo(ResultType.SUCCESS) != 0 ) {
+        if(lockReturn.getResult() == ResultType.FAILURE) {
             //\TODO Improve the exponential backoff
-            int n = 1;
+            int n = request.getNumOfAttempts();
             int low = 1;
             int high = 1000;
             Random r = new Random();
-            while(MusicCore.whoseTurnIsIt(fullyQualifiedKey) != lockId){
-                try {
-                    Thread.sleep(((int) Math.round(Math.pow(2, n)) * 1000)
-                        + (r.nextInt(high - low) + low));
-                } catch (InterruptedException e) {
-                    continue;
-                }
-                n++;
-                if (n == 20) {
-                    throw new MDBCServiceException("Lock was impossible to obtain, waited for 20 exponential backoffs!");
-                }
-            }
+            long backOffTimems = ((int) Math.round(Math.pow(2, n)) * 1000)
+                    + (r.nextInt(high - low) + low);
+            return new LockResult(false, backOffTimems);
         }
-        return new LockResult(request.id,lockId,true,null);
+        return new LockResult(true, request.getId(),lockId,true,null);
     }
 
     private void recoverFromFailureAndUpdateDag(Dag latestDag,List<MusicRangeInformationRow> rows,List<Range> ranges,
@@ -2197,7 +2149,8 @@ public class MusicMixin implements MusicInterface {
         return newRow;
     }
 
-    private OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges,
+    @Override
+    public OwnershipReturn mergeLatestRows(Dag extendedDag, List<MusicRangeInformationRow> latestRows, List<Range> ranges,
                                             Map<UUID,LockResult> locks, UUID ownershipId) throws MDBCServiceException{
         recoverFromFailureAndUpdateDag(extendedDag,latestRows,ranges,locks);
         List<MusicRangeInformationRow> changed = setReadOnlyAnyDoubleRow(extendedDag, latestRows,locks);
@@ -2221,119 +2174,7 @@ public class MusicMixin implements MusicInterface {
         return new OwnershipReturn(ownershipId, ownRow.getOwnerId(), ownRow.getIndex(),ranges,extendedDag);
     }
 
-    // \TODO merge with dag code
-    private Map<Range,Set<DagNode>> getIsLatestPerRange(Dag dag, List<MusicRangeInformationRow> rows) throws MDBCServiceException {
-        Map<Range,Set<DagNode>> rowsPerLatestRange = new HashMap<>();
-        for(MusicRangeInformationRow row : rows){
-            DatabasePartition dbPartition = row.getDBPartition();
-            if (row.getIsLatest()) {
-                for(Range range : dbPartition.getSnapshot()){
-                    if(!rowsPerLatestRange.containsKey(range)){
-                        rowsPerLatestRange.put(range,new HashSet<>());
-                    }
-                    DagNode node = dag.getNode(row.getPartitionIndex());
-                    if(node!=null) {
-                        rowsPerLatestRange.get(range).add(node);
-                    }
-                    else{
-                        rowsPerLatestRange.get(range).add(new DagNode(row));
-                    }
-                }
-            }
-        }
-        return rowsPerLatestRange;
-    }
-
-    /**
-     * Take locking ownership of each range
-     * @param ranges - ranges that need to be owned
-     * @param partition - current partition owned
-     * @param opId
-     */
-    @Override
-    public OwnershipReturn own(List<Range> ranges, DatabasePartition partition, UUID opId) throws MDBCServiceException {
-       
-       if(ranges == null || ranges.isEmpty()) {
-              return null;
-       }
-
-       Map<UUID,LockResult> newLocks = new HashMap<>();
-        //Init timeout clock
-        ownAndCheck.startOwnershipTimeoutClock(opId);
-        if(partition.isLocked()&&partition.getSnapshot().containsAll(ranges)) {
-            return new OwnershipReturn(opId,partition.getLockId(),partition.getMRIIndex(),partition.getSnapshot(),null);
-        }
-        //Find
-        List<Range> rangesToOwn = getRangeDependencies(ranges);
-        List<MusicRangeInformationRow> allMriRows = getAllMriRows();
-        List<MusicRangeInformationRow> rows = ownAndCheck.extractRowsForRange(allMriRows,rangesToOwn, false);
-        Dag toOwn =  Dag.getDag(rows,rangesToOwn);
-        Dag currentlyOwn = new Dag();
-        while( (toOwn.isDifferent(currentlyOwn) || !currentlyOwn.isOwned() ) &&
-                !ownAndCheck.timeout(opId)
-            ){
-            takeOwnershipOfDag(partition, opId, newLocks, toOwn);
-            currentlyOwn=toOwn;
-            //TODO instead of comparing dags, compare rows
-            allMriRows = getAllMriRows();
-            rows = ownAndCheck.extractRowsForRange(allMriRows,rangesToOwn,false);
-            toOwn =  Dag.getDag(rows,rangesToOwn);
-        }
-        if(!currentlyOwn.isOwned() || toOwn.isDifferent(currentlyOwn)){
-           releaseLocks(newLocks);
-           ownAndCheck.stopOwnershipTimeoutClock(opId);
-           logger.error("Error when owning a range: Timeout");
-           throw new MDBCServiceException("Ownership timeout");
-        }
-        Set<Range> allRanges = currentlyOwn.getAllRanges();
-        List<MusicRangeInformationRow> isLatestRows = ownAndCheck.extractRowsForRange(allMriRows, new ArrayList<>(allRanges), true);
-        currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,isLatestRows));
-        return mergeLatestRows(currentlyOwn,rows,ranges,newLocks,opId);
-    }
-
-    /**
-     * Step through dag and take lock ownership of each range
-     * @param partition
-     * @param opId
-     * @param newLocks
-     * @param toOwn
-     * @throws MDBCServiceException
-     */
-    private void takeOwnershipOfDag(DatabasePartition partition, UUID opId, Map<UUID, LockResult> newLocks, Dag toOwn)
-            throws MDBCServiceException {
-        while(toOwn.hasNextToOwn()){
-            DagNode node = toOwn.nextToOwn();
-            MusicRangeInformationRow row = node.getRow();
-            UUID uuid = row.getPartitionIndex();
-            if(partition.isLocked()&&partition.getMRIIndex().equals(uuid)||
-                newLocks.containsKey(uuid) ||
-                !row.getIsLatest()){
-                toOwn.setOwn(node);
-            }
-            else{
-                LockResult lockResult = null;
-                boolean owned = false;
-                while(!owned && !ownAndCheck.timeout(opId)){
-                    try {
-                        LockRequest request = new LockRequest(musicRangeInformationTableName,uuid,
-                            new ArrayList(node.getRangeSet()));
-                        lockResult = waitForLock(request);
-                        owned = true;
-                    }
-                    catch (MDBCServiceException e){
-                        logger.warn("Locking failed, retrying",e);
-                    }
-                }
-                if(owned){
-                    toOwn.setOwn(node);
-                    newLocks.put(uuid,lockResult);
-                }
-                else{
-                    break;
-                }
-            }
-        }
-    }
+       
 
     /**
      * This function is used to check if we need to create a new row in MRI, beacause one of the new ranges is not contained
@@ -2578,11 +2419,6 @@ public class MusicMixin implements MusicInterface {
             executeMusicLockedDelete(music_ns,musicRangeInformationTableName,rows.getKey().toString(),rows.getValue());
         }
     }
-
-    @Override
-    public OwnershipAndCheckpoint getOwnAndCheck(){
-        return ownAndCheck;
-    }
     
     @Override
     public void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException{
index f72b0ec..ddf26ce 100644 (file)
@@ -29,10 +29,14 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.lang3.tuple.Pair;
 import org.onap.music.exceptions.MDBCServiceException;
 import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.mdbc.DatabasePartition;
 import org.onap.music.mdbc.Range;
 import org.onap.music.mdbc.mixins.DBInterface;
+import org.onap.music.mdbc.mixins.LockRequest;
 import org.onap.music.mdbc.mixins.LockResult;
 import org.onap.music.mdbc.mixins.MusicInterface;
+import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
+import org.onap.music.mdbc.mixins.MusicMixin;
 import org.onap.music.mdbc.tables.MriReference;
 import org.onap.music.mdbc.tables.MusicRangeInformationRow;
 import org.onap.music.mdbc.tables.MusicTxDigestId;
@@ -84,14 +88,8 @@ public class OwnershipAndCheckpoint{
         return false;
     }
 
-    /**
-     * Extracts all the rows that match any of the ranges.
-     * @param allMriRows
-     * @param ranges - ranges interested in
-     * @param onlyIsLatest - only return the "latest" rows
-     * @return
-     */
-    public List<MusicRangeInformationRow> extractRowsForRange(List<MusicRangeInformationRow> allMriRows, List<Range> ranges,
+    
+    private List<MusicRangeInformationRow> extractRowsForRange(List<MusicRangeInformationRow> allMriRows, List<Range> ranges,
                                                   boolean onlyIsLatest){
         List<MusicRangeInformationRow> rows = new ArrayList<>();
         for(MusicRangeInformationRow row : allMriRows){
@@ -114,6 +112,13 @@ public class OwnershipAndCheckpoint{
         return rows;
     }
 
+    /**
+     * Extracts all the rows that match any of the ranges.
+     * @param allMriRows
+     * @param ranges - ranges interested in
+     * @param onlyIsLatest - only return the "latest" rows
+     * @return
+     */
     private List<MusicRangeInformationRow> extractRowsForRange(MusicInterface music, List<Range> ranges, boolean onlyIsLatest)
         throws MDBCServiceException {
         final List<MusicRangeInformationRow> allMriRows = music.getAllMriRows();
@@ -248,4 +253,141 @@ public class OwnershipAndCheckpoint{
 
     }
 
+    /**
+     * Use this functions to verify ownership, and taking locking ownership of new ranges
+     * @param ranges the ranges that should be own after calling this function
+     * @param partition current information of the ownership in the system
+     * @param ownOpId is the id used to describe this ownership operation (it is not used to create the new row, if any is
+     *                required
+     * @return an object indicating the status of the own function result
+     * @throws MDBCServiceException
+     */
+    public OwnershipReturn own(MusicInterface mi, List<Range> ranges, DatabasePartition partition, UUID opId) throws MDBCServiceException {
+        
+        if(ranges == null || ranges.isEmpty()) {
+              return null;
+        }
+
+        Map<UUID,LockResult> newLocks = new HashMap<>();
+        //Init timeout clock
+        startOwnershipTimeoutClock(opId);
+        if(partition.isLocked()&&partition.getSnapshot().containsAll(ranges)) {
+            return new OwnershipReturn(opId,partition.getLockId(),partition.getMRIIndex(),partition.getSnapshot(),null);
+        }
+        //Find
+        List<Range> rangesToOwn = mi.getRangeDependencies(ranges);
+        List<MusicRangeInformationRow> rows = extractRowsForRange(mi,rangesToOwn, false);
+        Dag toOwn =  Dag.getDag(rows,rangesToOwn);
+        Dag currentlyOwn = new Dag();
+        while( (toOwn.isDifferent(currentlyOwn) || !currentlyOwn.isOwned() ) &&
+                !timeout(opId)
+            ){
+            takeOwnershipOfDag(mi, partition, opId, newLocks, toOwn);
+            currentlyOwn=toOwn;
+            //TODO instead of comparing dags, compare rows
+            rows = extractRowsForRange(mi, rangesToOwn, false);
+            toOwn =  Dag.getDag(rows,rangesToOwn);
+        }
+        if(!currentlyOwn.isOwned() || toOwn.isDifferent(currentlyOwn)){
+           mi.releaseLocks(newLocks);
+           stopOwnershipTimeoutClock(opId);
+           logger.error("Error when owning a range: Timeout");
+           throw new MDBCServiceException("Ownership timeout");
+        }
+        Set<Range> allRanges = currentlyOwn.getAllRanges();
+        List<MusicRangeInformationRow> isLatestRows = extractRowsForRange(mi, new ArrayList<>(allRanges), true);
+        currentlyOwn.setRowsPerLatestRange(getIsLatestPerRange(toOwn,isLatestRows));
+        return mi.mergeLatestRows(currentlyOwn,rows,ranges,newLocks,opId);
+    }
+    
+    /**
+     * Step through dag and take lock ownership of each range
+     * @param partition
+     * @param opId
+     * @param newLocks
+     * @param toOwn
+     * @throws MDBCServiceException
+     */
+    private void takeOwnershipOfDag(MusicInterface mi, DatabasePartition partition, UUID opId, Map<UUID, LockResult> newLocks, Dag toOwn)
+            throws MDBCServiceException {
+        while(toOwn.hasNextToOwn()){
+            DagNode node = toOwn.nextToOwn();
+            MusicRangeInformationRow row = node.getRow();
+            UUID uuid = row.getPartitionIndex();
+            if(partition.isLocked()&&partition.getMRIIndex().equals(uuid)||
+                newLocks.containsKey(uuid) ||
+                !row.getIsLatest()){
+                toOwn.setOwn(node);
+            }
+            else{
+                LockRequest request = new LockRequest(MusicMixin.musicRangeInformationTableName,uuid,
+                        new ArrayList(node.getRangeSet()));
+                LockResult result = null;
+                boolean owned = false;
+                while(!owned && !timeout(opId)){
+                    try {
+                        result = mi.requestLock(request);
+                        if (result.wasSuccessful()) {
+                            owned = true;
+                            continue;
+                        }
+                        //backOff
+                        try {
+                            Thread.sleep(result.getBackOffPeriod());
+                        } catch (InterruptedException e) {
+                            continue;
+                        }
+                        request.incrementAttempts();
+                    }
+                    catch (MDBCServiceException e){
+                        logger.warn("Locking failed, retrying",e);
+                    }
+                }
+                if(owned){
+                    toOwn.setOwn(node);
+                    newLocks.put(uuid,result);
+                }
+                else{
+                    break;
+                }
+            }
+        }
+    }
+    
+    
+    public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException {
+        List<Range> snapshot = partition.getSnapshot();
+        UUID row = partition.getMRIIndex();
+        for(Range r : snapshot){
+            alreadyApplied.put(r,Pair.of(new MriReference(row),-1));
+        }
+    }
+    
+    // \TODO merge with dag code
+    private Map<Range,Set<DagNode>> getIsLatestPerRange(Dag dag, List<MusicRangeInformationRow> rows) throws MDBCServiceException {
+        Map<Range,Set<DagNode>> rowsPerLatestRange = new HashMap<>();
+        for(MusicRangeInformationRow row : rows){
+            DatabasePartition dbPartition = row.getDBPartition();
+            if (row.getIsLatest()) {
+                for(Range range : dbPartition.getSnapshot()){
+                    if(!rowsPerLatestRange.containsKey(range)){
+                        rowsPerLatestRange.put(range,new HashSet<>());
+                    }
+                    DagNode node = dag.getNode(row.getPartitionIndex());
+                    if(node!=null) {
+                        rowsPerLatestRange.get(range).add(node);
+                    }
+                    else{
+                        rowsPerLatestRange.get(range).add(new DagNode(row));
+                    }
+                }
+            }
+        }
+        return rowsPerLatestRange;
+    }
+
+    public Map<Range, Pair<MriReference, Integer>> getAlreadyApplied() {
+        return this.alreadyApplied;
+    } 
+    
 }
index 03db7a7..4db3315 100644 (file)
@@ -78,7 +78,7 @@ public class MusicTxDigest {
                         warmupRanges.removeAll(partitionRanges);
                     }
                     try {
-                        mi.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
+                        stateManager.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
                     } catch (MDBCServiceException e) {
                         logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
                         continue;
index df673c9..e8b7511 100644 (file)
@@ -50,6 +50,7 @@ import org.onap.music.main.MusicCore;
 import org.onap.music.mdbc.DatabasePartition;
 import org.onap.music.mdbc.MDBCUtils;
 import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.StateManager;
 import org.onap.music.mdbc.TestUtils;
 import org.onap.music.mdbc.ownership.Dag;
 import org.onap.music.mdbc.ownership.DagNode;
@@ -69,6 +70,7 @@ public class MusicMixinTest {
     private static Session session;
     private static String cassaHost = "localhost";
     private static MusicMixin mixin = null;
+    private StateManager stateManager;
 
     @BeforeClass
     public static void init() throws MusicServiceException {
@@ -107,28 +109,29 @@ public class MusicMixinTest {
             Properties properties = new Properties();
             properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace);
             properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName);
-            mixin=new MusicMixin(mdbcServerName,properties);
+            mixin=new MusicMixin(stateManager, mdbcServerName,properties);
         } catch (MDBCServiceException e) {
             fail("error creating music mixin");
         }
 
     }
 
-    @Test(timeout=10000)
-    public void own() {
-        Range range = new Range("TABLE1");
-        List<Range> ranges = new ArrayList<>();
-        ranges.add(range);
-        final DatabasePartition partition = TestUtils.createBasicRow(range, mixin, mdbcServerName);
-        TestUtils.unlockRow(keyspace,mriTableName,partition);
-
-        DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
-        try {
-            mixin.own(ranges,currentPartition, MDBCUtils.generateTimebasedUniqueKey());
-        } catch (MDBCServiceException e) {
-            fail("failure when running own function");
-        }
-    }
+    //Own has been removed from musicMixin
+//    @Test(timeout=10000)
+//    public void own() {
+//        Range range = new Range("TABLE1");
+//        List<Range> ranges = new ArrayList<>();
+//        ranges.add(range);
+//        final DatabasePartition partition = TestUtils.createBasicRow(range, mixin, mdbcServerName);
+//        TestUtils.unlockRow(keyspace,mriTableName,partition);
+//
+//        DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
+//        try {
+//            mixin.own(ranges,currentPartition, MDBCUtils.generateTimebasedUniqueKey());
+//        } catch (MDBCServiceException e) {
+//            fail("failure when running own function");
+//        }
+//    }
 
     private DatabasePartition addRow(List<Range> ranges,boolean isLatest){
         final UUID uuid = MDBCUtils.generateTimebasedUniqueKey();
@@ -150,80 +153,81 @@ public class MusicMixinTest {
         return partition;
     }
 
-    @Test(timeout=10000)
-    public void own2() throws InterruptedException, MDBCServiceException {
-        List<Range> range12 = new ArrayList<>( Arrays.asList(
-            new Range("RANGE1"),
-            new Range("RANGE2")
-        ));
-        List<Range> range34 = new ArrayList<>( Arrays.asList(
-            new Range("RANGE3"),
-            new Range("RANGE4")
-        ));
-        List<Range> range24 = new ArrayList<>( Arrays.asList(
-            new Range("RANGE2"),
-            new Range("RANGE4")
-        ));
-        List<Range> range123 = new ArrayList<>( Arrays.asList(
-            new Range("RANGE1"),
-            new Range("RANGE2"),
-            new Range("RANGE3")
-        ));
-        DatabasePartition db1 = addRow(range12, false);
-        DatabasePartition db2 = addRow(range34, false);
-        MILLISECONDS.sleep(10);
-        DatabasePartition db3 = addRow(range12, true);
-        DatabasePartition db4 = addRow(range34, true);
-        MILLISECONDS.sleep(10);
-        DatabasePartition db5 = addRow(range24, true);
-        DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
-        MusicInterface.OwnershipReturn own = null;
-        try {
-            own = mixin.own(range123, currentPartition, MDBCUtils.generateTimebasedUniqueKey());
-        } catch (MDBCServiceException e) {
-            fail("failure when running own function");
-        }
-        Dag dag = own.getDag();
-
-        DagNode node4 = dag.getNode(db4.getMRIIndex());
-        assertFalse(node4.hasNotIncomingEdges());
-        List<DagNode> outgoingEdges = new ArrayList<>(node4.getOutgoingEdges());
-        assertEquals(1,outgoingEdges.size());
-
-        DagNode missing = outgoingEdges.get(0);
-        Set<Range> missingRanges = missing.getRangeSet();
-        assertEquals(2,missingRanges.size());
-        assertTrue(missingRanges.contains(new Range("RANGE1")));
-        assertTrue(missingRanges.contains(new Range("RANGE3")));
-        List<DagNode> outgoingEdges1 = missing.getOutgoingEdges();
-        assertEquals(1,outgoingEdges1.size());
-
-        DagNode finalNode = outgoingEdges1.get(0);
-        assertFalse(finalNode.hasNotIncomingEdges());
-        Set<Range> finalSet = finalNode.getRangeSet();
-        assertEquals(3,finalSet.size());
-        assertTrue(finalSet.contains(new Range("RANGE1")));
-        assertTrue(finalSet.contains(new Range("RANGE2")));
-        assertTrue(finalSet.contains(new Range("RANGE3")));
-
-        DagNode node5 = dag.getNode(db5.getMRIIndex());
-        List<DagNode> toRemoveOutEdges = node5.getOutgoingEdges();
-        assertEquals(1,toRemoveOutEdges.size());
-        toRemoveOutEdges.remove(finalNode);
-        assertEquals(0,toRemoveOutEdges.size());
-
-        MusicRangeInformationRow row = mixin.getMusicRangeInformation(own.getRangeId());
-        assertTrue(row.getIsLatest());
-        DatabasePartition dbPartition = row.getDBPartition();
-        List<Range> snapshot = dbPartition.getSnapshot();
-        assertEquals(3,snapshot.size());
-        MusicRangeInformationRow node5row = mixin.getMusicRangeInformation(node5.getId());
-        assertFalse(node5row.getIsLatest());
-        MusicRangeInformationRow node4Row = mixin.getMusicRangeInformation(db4.getMRIIndex());
-        assertFalse(node4Row.getIsLatest());
-        MusicRangeInformationRow node3Row = mixin.getMusicRangeInformation(db3.getMRIIndex());
-        assertFalse(node3Row.getIsLatest());
-    }
+    //Own has been removed from musicMixin
+//    @Test(timeout=10000)
+//    public void own2() throws InterruptedException, MDBCServiceException {
+//        List<Range> range12 = new ArrayList<>( Arrays.asList(
+//            new Range("RANGE1"),
+//            new Range("RANGE2")
+//        ));
+//        List<Range> range34 = new ArrayList<>( Arrays.asList(
+//            new Range("RANGE3"),
+//            new Range("RANGE4")
+//        ));
+//        List<Range> range24 = new ArrayList<>( Arrays.asList(
+//            new Range("RANGE2"),
+//            new Range("RANGE4")
+//        ));
+//        List<Range> range123 = new ArrayList<>( Arrays.asList(
+//            new Range("RANGE1"),
+//            new Range("RANGE2"),
+//            new Range("RANGE3")
+//        ));
+//        DatabasePartition db1 = addRow(range12, false);
+//        DatabasePartition db2 = addRow(range34, false);
+//        MILLISECONDS.sleep(10);
+//        DatabasePartition db3 = addRow(range12, true);
+//        DatabasePartition db4 = addRow(range34, true);
+//        MILLISECONDS.sleep(10);
+//        DatabasePartition db5 = addRow(range24, true);
+//        DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
+//        MusicInterface.OwnershipReturn own = null;
+//        try {
+//            own = mixin.own(range123, currentPartition, MDBCUtils.generateTimebasedUniqueKey());
+//        } catch (MDBCServiceException e) {
+//            fail("failure when running own function");
+//        }
+//        Dag dag = own.getDag();
+//
+//        DagNode node4 = dag.getNode(db4.getMRIIndex());
+//        assertFalse(node4.hasNotIncomingEdges());
+//        List<DagNode> outgoingEdges = new ArrayList<>(node4.getOutgoingEdges());
+//        assertEquals(1,outgoingEdges.size());
+//
+//        DagNode missing = outgoingEdges.get(0);
+//        Set<Range> missingRanges = missing.getRangeSet();
+//        assertEquals(2,missingRanges.size());
+//        assertTrue(missingRanges.contains(new Range("RANGE1")));
+//        assertTrue(missingRanges.contains(new Range("RANGE3")));
+//        List<DagNode> outgoingEdges1 = missing.getOutgoingEdges();
+//        assertEquals(1,outgoingEdges1.size());
+//
+//        DagNode finalNode = outgoingEdges1.get(0);
+//        assertFalse(finalNode.hasNotIncomingEdges());
+//        Set<Range> finalSet = finalNode.getRangeSet();
+//        assertEquals(3,finalSet.size());
+//        assertTrue(finalSet.contains(new Range("RANGE1")));
+//        assertTrue(finalSet.contains(new Range("RANGE2")));
+//        assertTrue(finalSet.contains(new Range("RANGE3")));
+//
+//        DagNode node5 = dag.getNode(db5.getMRIIndex());
+//        List<DagNode> toRemoveOutEdges = node5.getOutgoingEdges();
+//        assertEquals(1,toRemoveOutEdges.size());
+//        toRemoveOutEdges.remove(finalNode);
+//        assertEquals(0,toRemoveOutEdges.size());
+//
+//        MusicRangeInformationRow row = mixin.getMusicRangeInformation(own.getRangeId());
+//        assertTrue(row.getIsLatest());
+//        DatabasePartition dbPartition = row.getDBPartition();
+//        List<Range> snapshot = dbPartition.getSnapshot();
+//        assertEquals(3,snapshot.size());
+//        MusicRangeInformationRow node5row = mixin.getMusicRangeInformation(node5.getId());
+//        assertFalse(node5row.getIsLatest());
+//        MusicRangeInformationRow node4Row = mixin.getMusicRangeInformation(db4.getMRIIndex());
+//        assertFalse(node4Row.getIsLatest());
+//        MusicRangeInformationRow node3Row = mixin.getMusicRangeInformation(db3.getMRIIndex());
+//        assertFalse(node3Row.getIsLatest());
+//    }
 
 
     @Test
index 4950484..7db973c 100644 (file)
@@ -36,6 +36,8 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.onap.music.datastore.MusicDataStore;
 import org.onap.music.datastore.MusicDataStoreHandle;
 import org.onap.music.exceptions.MDBCServiceException;
@@ -45,6 +47,7 @@ import org.onap.music.lockingservice.cassandra.CassaLockStore;
 import org.onap.music.mdbc.DatabasePartition;
 import org.onap.music.mdbc.MDBCUtils;
 import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.StateManager;
 import org.onap.music.mdbc.TestUtils;
 import org.onap.music.mdbc.mixins.LockResult;
 import org.onap.music.mdbc.mixins.MusicInterface.OwnershipReturn;
@@ -79,6 +82,11 @@ public class OwnershipAndCheckpointTest {
     private static DB db;
        Connection conn;
        MySQLMixin mysqlMixin;
+       OwnershipAndCheckpoint ownAndCheck;
+       
+       @Mock
+       StateManager stateManager = Mockito.mock(StateManager.class);
+       
 
     @BeforeClass
     public static void init() throws MusicServiceException, ClassNotFoundException, ManagedProcessException {
@@ -141,9 +149,11 @@ public class OwnershipAndCheckpointTest {
             Properties properties = new Properties();
             properties.setProperty(MusicMixin.KEY_MY_ID,mdbcServerName);
             properties.setProperty(MusicMixin.KEY_MUSIC_NAMESPACE,keyspace);
-            musicMixin =new MusicMixin(mdbcServerName,properties);
+            //StateManager stateManager = new StateManager("dbUrl", properties, "serverName", "dbName");
+            ownAndCheck = new OwnershipAndCheckpoint();
+            musicMixin =new MusicMixin(stateManager, mdbcServerName,properties);
         } catch (MDBCServiceException e) {
-            fail("error creating music musicMixin");
+            fail("error creating music musicMixin " + e.getMessage());
         }
         this.conn = DriverManager.getConnection("jdbc:mariadb://localhost:"+sqlPort+"/"+DATABASE, "root", "");
         this.mysqlMixin = new MySQLMixin(musicMixin, "localhost:"+sqlPort+"/"+DATABASE, conn, null);
@@ -155,7 +165,7 @@ public class OwnershipAndCheckpointTest {
         String sqlOperation = "INSERT INTO "+TABLE+" (PersonID,LastName,FirstName,Address,City) VALUES "+
             "(1,'SAUREZ','ENRIQUE','GATECH','ATLANTA');";
         StagingTable stagingTable = new StagingTable();
-        musicMixin.reloadAlreadyApplied(partition);
+        ownAndCheck.reloadAlreadyApplied(partition);
         final Statement executeStatement = this.conn.createStatement();
         executeStatement.execute(sqlOperation);
         this.conn.commit();
@@ -170,12 +180,12 @@ public class OwnershipAndCheckpointTest {
 
     private OwnershipReturn cleanAndOwnPartition(List<Range> ranges, UUID ownOpId) throws SQLException {
         dropAndCreateTable();
-        musicMixin.cleanAlreadyApplied();
+        cleanAlreadyApplied(ownAndCheck);
         DatabasePartition currentPartition = new DatabasePartition(MDBCUtils.generateTimebasedUniqueKey());
 
         OwnershipReturn own=null;
         try {
-            own = musicMixin.own(ranges, currentPartition, ownOpId);
+            own = ownAndCheck.own(musicMixin, ranges, currentPartition, ownOpId);
         } catch (MDBCServiceException e) {
             fail("failure when running own function");
         }
@@ -207,7 +217,8 @@ public class OwnershipAndCheckpointTest {
     public void checkpoint() throws MDBCServiceException, SQLException {
         Range range =
             new Range(TABLE);
-        OwnershipAndCheckpoint ownAndCheck = musicMixin.getOwnAndCheck();
+        Mockito.when(stateManager.getOwnAndCheck()).thenReturn(this.ownAndCheck);
+
         initDatabase(range);
 
         List<Range> ranges = new ArrayList<>();
@@ -230,7 +241,8 @@ public class OwnershipAndCheckpointTest {
     //@Ignore
     public void warmup() throws MDBCServiceException, SQLException {
         Range range = new Range(TABLE);
-        OwnershipAndCheckpoint ownAndCheck = musicMixin.getOwnAndCheck();
+        Mockito.when(stateManager.getOwnAndCheck()).thenReturn(this.ownAndCheck);
+
         initDatabase(range);
 
         List<Range> ranges = new ArrayList<>();
@@ -249,4 +261,8 @@ public class OwnershipAndCheckpointTest {
 
         checkData();
     }
+    
+    private void cleanAlreadyApplied(OwnershipAndCheckpoint ownAndCheck) {
+        ownAndCheck.getAlreadyApplied().clear();
+    }
 }
\ No newline at end of file