//iterations
Map<Integer, List<Long>> results = new HashMap<>();
final int totalIterations = 20;
- final int[] rows = { 1,10,100, 500, 1000};
+ final int[] rows = { 1,10,50,80//};
+ ,100,200,300,400};//, 500};
+ //, 1000};
for(int row: rows) {
System.out.println("Running for rows: "+Integer.toString(row));
results.put(row,new ArrayList<Long>());
* This function is used to change the contents of this, with the contents of a different object
* @param otherPartition partition that is used to substitute the local contents
*/
- public void updateDatabasePartition(DatabasePartition otherPartition){
+ public synchronized void updateDatabasePartition(DatabasePartition otherPartition){
musicRangeInformationIndex = otherPartition.musicRangeInformationIndex;//Index that can be obtained either from
lockId = otherPartition.lockId;
ranges = otherPartition.ranges;
}
- public boolean isLocked(){return lockId != null && !lockId.isEmpty(); }
+ public synchronized boolean isLocked(){return lockId != null && !lockId.isEmpty(); }
- public boolean isReady() {
+ public synchronized boolean isReady() {
return ready;
}
- public void setReady(boolean ready) {
+ public synchronized void setReady(boolean ready) {
this.ready = ready;
}
- public UUID getMRIIndex() {
+ public synchronized UUID getMRIIndex() {
return musicRangeInformationIndex;
}
- public void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) {
+ public synchronized void setMusicRangeInformationIndex(UUID musicRangeInformationIndex) {
this.musicRangeInformationIndex = musicRangeInformationIndex;
}
return range;
}
- public String getLockId() {
+ public synchronized String getLockId() {
return lockId;
}
- public void setLockId(String lockId) {
+ public synchronized void setLockId(String lockId) {
this.lockId = lockId;
}
- public boolean isContained(Range range){
+ public synchronized boolean isContained(Range range){
for(Range r: ranges){
if(r.overlaps(range)){
return true;
DatabasePartition tempPartition = own(scQueryTables);
if(tempPartition!=null && tempPartition != partition) {
this.partition.updateDatabasePartition(tempPartition);
+ mi.reloadAlreadyApplied(this.partition);
}
dbi.preStatementHook(sql);
}
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
* \TODO Implement an interface for the server logic and a factory
/** Identifier for this server instance */
private String mdbcServerName;
private Map<String,DatabasePartition> connectionRanges;//Each connection owns its own database partition
+ private final Lock eventualLock = new ReentrantLock();
private List<Range> eventualRanges;
+ private final Lock warmupLock = new ReentrantLock();
+ private List<Range> warmupRanges;
public StateManager(String sqlDBUrl, Properties info, String mdbcServerName, String sqlDBName) throws MDBCServiceException {
this.sqlDBName = sqlDBName;
this.info = info;
this.mdbcServerName = mdbcServerName;
- this.connectionRanges = new HashMap<>();
+ this.connectionRanges = new ConcurrentHashMap<>();
this.transactionInfo = new TxCommitProgress();
//\fixme this might not be used, delete?
try {
return this.musicInterface;
}
- public List<DatabasePartition> getRanges() {
+ public List<DatabasePartition> getPartitions() {
return new ArrayList<>(connectionRanges.values());
}
+ public List<Range> getWarmupRanges(){
+ warmupLock.lock();
+ List<Range> returnArray;
+ try {
+ if(warmupRanges!=null) {
+ returnArray = new ArrayList<>(warmupRanges);
+ }
+ else{
+ returnArray = null;
+ }
+ }
+ finally{
+ warmupLock.unlock();
+ }
+ return returnArray;
+ }
public List<Range> getEventualRanges() {
- return eventualRanges;
+ eventualLock.lock();
+ List<Range> returnArray;
+ try {
+ if(eventualRanges!=null){
+ returnArray = new ArrayList<>(eventualRanges);
+ }
+ else{
+ returnArray= null;
+ }
+ }
+ finally{
+ eventualLock.unlock();
+ }
+ return returnArray;
}
public void setEventualRanges(List<Range> eventualRanges) {
- this.eventualRanges = eventualRanges;
+ eventualLock.lock();
+ try {
+ this.eventualRanges = eventualRanges;
+ }
+ finally{
+ eventualLock.unlock();
+ }
}
public void closeConnection(String connectionId){
}
}
+
+ public void setWarmupRanges(List<Range> warmupRanges) {
+ warmupLock.lock();
+ try {
+ this.warmupRanges = warmupRanges;
+ }
+ finally{
+ warmupLock.unlock();
+ }
+ }
}
private String internalNamespace;
private int internalReplicationFactor;
private String musicNamespace;
+ private int musicReplicationFactor;
private String tableToPartitionName;
private String partitionInformationTableName;
private String redoHistoryTableName;
*/
public List<NodeConfiguration> initializeAndCreateNodeConfigurations() throws MDBCServiceException {
logger.info("initializing the required spaces");
+ createKeyspaces();
initInternalNamespace();
List<NodeConfiguration> nodeConfigs = new ArrayList<>();
String partitionId;
if(partitionInfo.partitionId==null || partitionInfo.partitionId.isEmpty()){
- if(partitionInfo.replicationFactor==0){
- logger.error("Replication factor and partition id are both empty, and this is an invalid configuration" );
- throw new MDBCServiceException("Replication factor and partition id are both empty, and this is an invalid configuration");
- }
//1) Create a row in the partition info table
partitionId = MDBCUtils.generateTimebasedUniqueKey().toString();
}
return nodeConfigs;
}
+ private void createKeyspaces() throws MDBCServiceException {
+ MusicMixin.createKeyspace(internalNamespace,internalReplicationFactor);
+ MusicMixin.createKeyspace(musicNamespace,musicReplicationFactor);
+
+ }
+
private void checkIfMriIsEmpty(String mriTableName) throws MDBCServiceException {
//First check if table exists
StringBuilder checkTableExistsString = new StringBuilder("SELECT table_name FROM system_schema.tables WHERE keyspace_name='")
private String mriTableName;
private String mtxdTableName;
private String partitionId;
- private int replicationFactor;
public List<Range> getTables() {
return tables;
this.partitionId = partitionId;
}
- public int getReplicationFactor() {
- return replicationFactor;
- }
-
- public void setReplicationFactor(int replicationFactor) {
- this.replicationFactor = replicationFactor;
- }
-
public String getMtxdTableName(){
return mtxdTableName;
}
"owner": "",
"mriTableName": "musicrangeinformation",
"mtxdTableName": "musictxdigest",
- "partitionId": "",
- "replicationFactor": 1
+ "partitionId": ""
}
],
+ "internalNamespace": "music_internal",
+ "internalReplicationFactor": 1,
"musicNamespace": "namespace",
+ "musicReplicationFactor": 1,
"tableToPartitionName": "tabletopartition",
"partitionInformationTableName": "partitioninfo",
"redoHistoryTableName": "redohistory",
- "sqlDatabaseName": "test",
- "internalNamespace": "music_internal",
- "internalReplicationFactor": 1
+ "sqlDatabaseName": "test"
}
OwnershipAndCheckpoint getOwnAndCheck();
-
ArrayList<HashMap<Range, StagingTable>> getEveTxDigest() throws MDBCServiceException;
+
+ void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException;
}
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;
import org.onap.music.mdbc.ownership.Dag;
import org.onap.music.mdbc.ownership.DagNode;
import org.onap.music.mdbc.ownership.OwnershipAndCheckpoint;
-import org.onap.music.mdbc.tables.MusicRangeInformationRow;
-import org.onap.music.mdbc.tables.MusicTxDigestId;
-import org.onap.music.mdbc.tables.RangeDependency;
-import org.onap.music.mdbc.tables.StagingTable;
-import org.onap.music.mdbc.tables.TxCommitProgress;
+import org.onap.music.mdbc.tables.*;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.DataType;
private boolean keyspace_created = false;
private Map<String, PreparedStatement> ps_cache = new HashMap<>();
private Set<String> in_progress = Collections.synchronizedSet(new HashSet<String>());
- private Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied;
+ private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
private OwnershipAndCheckpoint ownAndCheck;
public MusicMixin() {
String t = info.getProperty(KEY_TIMEOUT);
this.timeout = (t == null) ? DEFAULT_TIMEOUT : Integer.parseInt(t);
- alreadyApplied = new HashMap<>();
+ alreadyApplied = new ConcurrentHashMap<>();
ownAndCheck = new OwnershipAndCheckpoint(alreadyApplied,timeout);
initializeMetricTables();
*/
@Override
public void createKeyspace() throws MDBCServiceException {
+ createKeyspace(this.music_ns,this.music_rfactor);
+ }
+ public static void createKeyspace(String keyspace, int replicationFactor) throws MDBCServiceException {
Map<String,Object> replicationInfo = new HashMap<>();
replicationInfo.put("'class'", "'SimpleStrategy'");
- replicationInfo.put("'replication_factor'", music_rfactor);
+ replicationInfo.put("'replication_factor'", replicationFactor);
PreparedQueryObject queryObject = new PreparedQueryObject();
queryObject.appendQueryString(
- "CREATE KEYSPACE IF NOT EXISTS " + this.music_ns +
+ "CREATE KEYSPACE IF NOT EXISTS " + keyspace +
" WITH REPLICATION = " + replicationInfo.toString().replaceAll("=", ":"));
try {
MusicCore.nonKeyRelatedPut(queryObject, "eventual");
} catch (MusicServiceException e) {
- if (!e.getMessage().equals("Keyspace "+music_ns+" already exists")) {
- throw new MDBCServiceException("Error creating namespace: "+music_ns+". Internal error:"+e.getErrorMessage(), e);
+ if (!e.getMessage().equals("Keyspace "+keyspace+" already exists")) {
+ throw new MDBCServiceException("Error creating namespace: "+keyspace+". Internal error:"+e.getErrorMessage(),
+ e);
}
}
}
//0. See if reference to lock was already created
String lockId = partition.getLockId();
if(mriIndex==null || lockId == null || lockId.isEmpty()) {
+ //\TODO fix this
own(partition.getSnapshot(),partition, MDBCUtils.generateTimebasedUniqueKey());
}
} catch (IOException e) {
throw new MDBCServiceException("Failed to serialized transaction digest with error " + e.toString(), e);
}
- MusicTxDigestId digestId = new MusicTxDigestId(commitId, -1);
+ MusicTxDigestId digestId = new MusicTxDigestId(mriIndex, -1);
addTxDigest(digestId, serializedTransactionDigest);
//2. Save RRT index to RQ
if (progressKeeper != null) {
}
//3. Append RRT index into the corresponding TIT row array
appendToRedoLog(partition, digestId);
+ List<Range> ranges = partition.getSnapshot();
+ for(Range r : ranges) {
+ if(!alreadyApplied.containsKey(r)){
+ throw new MDBCServiceException("already applied data structure was not updated correctly and range "
+ +r+" is not contained");
+ }
+ Pair<MriReference, Integer> rowAndIndex = alreadyApplied.get(r);
+ MriReference key = rowAndIndex.getKey();
+ if(!mriIndex.equals(key.index)){
+ throw new MDBCServiceException("already applied data structure was not updated correctly and range "+
+ r+" is not pointing to row: "+mriIndex.toString());
+ }
+ alreadyApplied.put(r, Pair.of(new MriReference(mriIndex), rowAndIndex.getValue()+1));
+ }
}
}
return ecDigestList;
}
+ @Override
+ public void reloadAlreadyApplied(DatabasePartition partition) throws MDBCServiceException {
+ List<Range> snapshot = partition.getSnapshot();
+ UUID row = partition.getMRIIndex();
+ for(Range r : snapshot){
+ alreadyApplied.put(r,Pair.of(new MriReference(row),-1));
+ }
+
+ }
+
-
ResultSet getAllMriCassandraRows() throws MDBCServiceException {
StringBuilder cqlOperation = new StringBuilder();
cqlOperation.append("SELECT * FROM ")
StringBuilder keyCondStmt = new StringBuilder();
String and = "";
for (String key: primaryKeys.keySet()) {
- Object val = primaryKeys.get(key);
- keyCondStmt.append(and + key + "=\"" + val + "\"");
- and = " AND ";
+ // We cannot use the default primary key for the sql table and operations
+ if(!key.equals(mi.getMusicDefaultPrimaryKeyName())) {
+ Object val = primaryKeys.get(key);
+ keyCondStmt.append(and + key + "=\"" + val + "\"");
+ and = " AND ";
+ }
}
return keyCondStmt.toString();
}
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MriRowComparator;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
return toApplyNodes.isEmpty();
}
- public void setAlreadyApplied(Map<Range, Pair<MusicRangeInformationRow,Integer>> alreadyApplied, Set<Range> ranges)
+ public void setAlreadyApplied(Map<Range, Pair<MriReference,Integer>> alreadyApplied, Set<Range> ranges)
throws MDBCServiceException {
for(Map.Entry<UUID,DagNode> node : nodes.entrySet()){
Set<Range> intersection = new HashSet<>(ranges);
intersection.retainAll(node.getValue().getRangeSet());
for(Range r : intersection){
if(alreadyApplied.containsKey(r)){
- final Pair<MusicRangeInformationRow, Integer> appliedPair = alreadyApplied.get(r);
- final MusicRangeInformationRow appliedRow = appliedPair.getKey();
+ final Pair<MriReference, Integer> appliedPair = alreadyApplied.get(r);
+ final MriReference appliedRow = appliedPair.getKey();
final int index = appliedPair.getValue();
final long appliedTimestamp = appliedRow.getTimestamp();
final long nodeTimestamp = node.getValue().getTimestamp();
import org.onap.music.mdbc.mixins.DBInterface;
import org.onap.music.mdbc.mixins.LockResult;
import org.onap.music.mdbc.mixins.MusicInterface;
+import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.MusicTxDigestId;
import org.onap.music.mdbc.tables.StagingTable;
private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(OwnershipAndCheckpoint.class);
private Lock checkpointLock;
private AtomicBoolean change;
- private Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied;
+ private Map<Range, Pair<MriReference, Integer>> alreadyApplied;
private Map<UUID,Long> ownershipBeginTime;
private long timeoutInMs;
this(new HashMap<>(),Long.MAX_VALUE);
}
- public OwnershipAndCheckpoint(Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied, long timeoutInMs){
+ public OwnershipAndCheckpoint(Map<Range, Pair<MriReference, Integer>> alreadyApplied, long timeoutInMs){
change = new AtomicBoolean(true);
checkpointLock = new ReentrantLock();
this.alreadyApplied = alreadyApplied;
public void checkpoint(MusicInterface mi, DBInterface di, Dag extendedDag, List<Range> ranges,
Map<MusicRangeInformationRow, LockResult> locks, UUID ownOpId) throws MDBCServiceException {
+ if(ranges.isEmpty()){
+ return;
+ }
try {
checkpointLock.lock();
change.set(true);
}
public void warmup(MusicInterface mi, DBInterface di, List<Range> ranges) throws MDBCServiceException {
+ if(ranges.isEmpty()){
+ return;
+ }
boolean ready = false;
change.set(true);
Set<Range> rangeSet = new HashSet<Range>(ranges);
final HashMap<Range, StagingTable> txDigest = mi.getTxDigest(pair.getKey());
applyTxDigest(di, txDigest);
for (Range r : pair.getValue()) {
- alreadyApplied.put(r, Pair.of(node.getRow(), pair.getKey().index));
+ MusicRangeInformationRow row = node.getRow();
+ alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
}
}
pair = node.nextNotAppliedTransaction(rangeSet);
final HashMap<Range, StagingTable> txDigest = mi.getTxDigest(pair.getKey());
applyTxDigest(db, txDigest);
for (Range r : pair.getValue()) {
- alreadyApplied.put(r, Pair.of(node.getRow(), pair.getKey().index));
+ MusicRangeInformationRow row = node.getRow();
+ alreadyApplied.put(r, Pair.of(new MriReference(row.getPartitionIndex()), pair.getKey().index));
}
pair = node.nextNotAppliedTransaction(rangeSet);
if (timeout(ownOpId)) {
this.index= index;
}
+ public long getTimestamp() { return index.timestamp();}
+
}
import java.sql.SQLException;
import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
continue;
}
//2) for each partition I don't own
- List<DatabasePartition> ranges = stateManager.getRanges();
- if(ranges.size()!=0) {
- DatabasePartition myPartition = ranges.get(0);
- for (UUID partition : partitions) {
- if (!partition.equals(myPartition.getMRIIndex())) {
- try {
- //replayDigestForPartition(mi, partition, dbi);
- mi.getOwnAndCheck().warmup(mi, dbi, myPartition.getSnapshot());
- } catch (MDBCServiceException e) {
- logger.error("Unable to update for partition : " + partition + ". " + e.getMessage());
- continue;
- }
- }
- }
- }
-
- //Step 3: ReplayDigest() for E.C conditions
+ final List<Range> warmuplist = stateManager.getWarmupRanges();
+ if(warmuplist!=null) {
+ final Set<Range> warmupRanges = new HashSet(warmuplist);
+ final List<DatabasePartition> currentPartitions = stateManager.getPartitions();
+ List<Range> missingRanges = new ArrayList<>();
+ if (currentPartitions.size() != 0) {
+ for (DatabasePartition part : currentPartitions) {
+ List<Range> partitionRanges = part.getSnapshot();
+ warmupRanges.removeAll(partitionRanges);
+ }
+ try {
+ mi.getOwnAndCheck().warmup(mi, dbi, new ArrayList<>(warmupRanges));
+ } catch (MDBCServiceException e) {
+ logger.error("Unable to update for partition : " + warmupRanges + ". " + e.getMessage());
+ continue;
+ }
+ }
+ }
+
+ //Step 3: ReplayDigest() for E.C conditions
try {
replayDigest(mi,dbi);
} catch (MDBCServiceException e) {
cassandra.host =\
- 143.215.128.49
+ 192.168.1.19
cassandra.user =\
metric
cassandra.password =\
import org.onap.music.mdbc.DatabasePartition;
import org.onap.music.mdbc.MDBCUtils;
import org.onap.music.mdbc.Range;
+import org.onap.music.mdbc.tables.MriReference;
import org.onap.music.mdbc.tables.MusicRangeInformationRow;
import org.onap.music.mdbc.tables.MusicTxDigestId;
@Test
public void nextToApply2() throws InterruptedException, MDBCServiceException {
- Map<Range, Pair<MusicRangeInformationRow, Integer>> alreadyApplied = new HashMap<>();
+ Map<Range, Pair<MriReference, Integer>> alreadyApplied = new HashMap<>();
List<MusicRangeInformationRow> rows = new ArrayList<>();
List<Range> ranges = new ArrayList<>( Arrays.asList(
new Range("range1")
new MusicTxDigestId(MDBCUtils.generateUniqueKey(),1)
));
MusicRangeInformationRow newRow = createNewRow(new ArrayList<>(ranges), "", false, redo2);
- alreadyApplied.put(new Range("range1"),Pair.of(newRow, 0));
+ alreadyApplied.put(new Range("range1"),Pair.of(new MriReference(newRow.getPartitionIndex()), 0));
rows.add(newRow);
MILLISECONDS.sleep(10);
List<MusicTxDigestId> redo3 = new ArrayList<>(Arrays.asList(