mdbc-server/src/main/java/org/onap/music/mdbc/tools/CreateNodeConfiguration.java
To run it, use the following parameters:
-
+``
-t mdbc-server/src/main/java/org/onap/music/mdbc/configurations/tableConfiguration.json -b base -o /Users/quique/Desktop/
This program is going to generate all the required configuration json for each ETDB node in the system and additionally initialize all the corresponding rows and tables for the system to correctly work. The meaning of the parameters is:
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ============LICENSE_START==========================================
+ org.onap.music
+ ===================================================================
+ Copyright (c) 2018 AT&T Intellectual Property
+ ===================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ============LICENSE_END=============================================
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>mdbc</artifactId>
+ <groupId>org.onap.music.mdbc</groupId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>mdbc-benchmark</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>mdbc-benchmark</name>
+ <description>MDBC Benchmark</description>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ <version>20160810</version>
+ </dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>5.1.32</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.7</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.vmlens</groupId>
+ <artifactId>concurrent-junit</artifactId>
+ <version>1.0.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.att.eelf</groupId>
+ <artifactId>eelf-core</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica-server</artifactId>
+ <version>1.12.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.8.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mariadb.jdbc</groupId>
+ <artifactId>mariadb-java-client</artifactId>
+ <version>1.1.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ <version>3.0</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.1.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>node-configuration</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.onap.music.mdbc.Benchmark</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <finalName>mdbc-benchmark</finalName>
+ <appendAssemblyId>false</appendAssemblyId>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
--- /dev/null
+/*
+ * ============LICENSE_START====================================================
+ * org.onap.music.mdbc
+ * =============================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END======================================================
+ */
+
+package org.onap.music.mdbc;
+
+import java.sql.*;
+import java.util.*;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import org.apache.commons.math3.stat.descriptive.rank.Percentile;
+
+public class Benchmark
+{
+
+ @Parameter(names = { "-m", "--ismariadb" },
+ description = "Is MARIADB evaluation")
+ private boolean isMariaDb = false;
+
+
+ @Parameter(names = { "-h", "-help", "--help" }, help = true,
+ description = "Print the help message")
+ private boolean help = false;
+
+ public static class MyState {
+ private Connection createConnection(){
+ try {
+ Class.forName(this.driver);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ Connection connection=null;
+ try {
+ if(!isMariaDb) {
+ connection = DriverManager.getConnection(connectionUrl);
+ }
+ else{
+ Properties connectionProps = new Properties();
+ connectionProps.put("user", user);
+ connectionProps.put("password", password);
+ connection = DriverManager.getConnection(connectionUrl,connectionProps);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ try {
+ connection.setAutoCommit(false);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ return connection;
+ }
+
+ private void createTable(Connection connection){
+ final String sql = "CREATE TABLE IF NOT EXISTS PERSONS (\n" +
+ " PersonID int,\n" +
+ " Counter int,\n" +
+ " LastName varchar(255),\n" +
+ " FirstName varchar(255),\n" +
+ " Address varchar(255),\n" +
+ " City varchar(255)\n" +
+ ");";
+
+ Statement stmt = null;
+ try {
+ stmt = connection.createStatement();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+
+ Boolean execute=null;
+ try {
+ execute = stmt.execute(sql);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+
+ try {
+ connection.commit();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ private boolean cleanTable(){
+ String cleanCmd = "DELETE FROM PERSONS;";
+ Statement stmt = null;
+ try {
+ stmt = testConnection.createStatement();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+
+ Boolean execute=null;
+ try {
+ execute = stmt.execute(cleanCmd);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ try {
+ testConnection.commit();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ return execute;
+ }
+
+ private void addRowsToTable(int totalNumberOfRows){
+ for(int i=0; i<totalNumberOfRows; i++) {
+ final StringBuilder insertSQLBuilder = new StringBuilder()
+ .append("INSERT INTO PERSONS VALUES (")
+ .append(i)
+ .append(", ")
+ .append(0)
+ .append(", '")
+ .append("Last-")
+ .append(i)
+ .append("', '")
+ .append("First-")
+ .append(i)
+ .append("', 'KACB', 'ATLANTA');");
+ Statement stmt = null;
+ try {
+ stmt = testConnection.createStatement();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+
+ Boolean execute = null;
+ try {
+ execute = stmt.execute(insertSQLBuilder.toString());
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+ try {
+ testConnection.commit();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ public void doSetup() {
+ System.out.println("Do Global Setup");
+ Connection connection = createConnection();
+ createTable(connection);
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ public void doWarmup(int rows) {
+ System.out.println("Do Setup");
+ //Setup connection
+ testConnection = createConnection();
+
+ //Empty database
+ boolean cleanResult = cleanTable();
+
+ //Add new lines
+ addRowsToTable(rows);
+
+ //Commit
+ try {
+ testConnection.commit();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+
+ }
+
+ public void doTearDown() {
+ System.out.println("Do TearDown");
+ try {
+ testConnection.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ boolean isMariaDb = false;
+ String user = "root";
+ String password = "metriccluster";
+ public String driver = "org.apache.calcite.avatica.remote.Driver";
+ //public final String driver = "org.mariadb.jdbc.Driver";
+
+ public String connectionUrl = "jdbc:avatica:remote:url=http://localhost:30000;serialization=protobuf";
+ //public final String connectionUrl = "jdbc:mariadb://localhost:3306/test";
+
+ public Connection testConnection;
+ }
+
+ public static void testMethod(MyState state) {
+ //UPDATE table_name
+ //SET column1 = value1, column2 = value2, ...
+ //WHERE condition;
+ final StringBuilder updateBuilder = new StringBuilder()
+ .append("UPDATE PERSONS ")
+ .append("SET Counter = Counter + 1,")
+ .append("City = 'Sandy Springs'")
+ .append(";");
+ Statement stmt = null;
+ try {
+ stmt = state.testConnection.createStatement();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+
+ Boolean execute = null;
+ try {
+ execute = stmt.execute(updateBuilder.toString());
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ try {
+ state.testConnection.commit();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ private static void printResults(Map<Integer,List<Long>> results){
+ System.out.println("ROWS,P1,P5,P10,P25,P50,P75,P90,P95,P99,AVG,DEVIATION");
+ for(Map.Entry<Integer,List<Long>> result : results.entrySet()) {
+ Percentile resultsPercentiles = new Percentile();
+ double[] tmpList= new double[result.getValue().size()];
+ int counter = 0;
+ for(Long val : result.getValue()) {
+ tmpList[counter++]= Double.valueOf(val);
+ }
+ resultsPercentiles.setData(tmpList);
+ final double average = result.getValue()
+ .stream()
+ .mapToDouble((x) -> x.doubleValue())
+ .summaryStatistics()
+ .getAverage();
+
+ final double rawSum = result.getValue()
+ .stream()
+ .mapToDouble((x) -> Math.pow(x.doubleValue() - average,
+ 2.0))
+ .sum();
+
+ final double deviation = Math.sqrt(rawSum / (result.getValue().size() - 1));
+ System.out.printf("%d,%f,%f,%f,%f,%f,%f,%f,%f,%f,%f,%f\n",
+ result.getKey(),
+ resultsPercentiles.evaluate(1.0),
+ resultsPercentiles.evaluate(5.0),
+ resultsPercentiles.evaluate(10.0),
+ resultsPercentiles.evaluate(25.0),
+ resultsPercentiles.evaluate(50.0),
+ resultsPercentiles.evaluate(75.0),
+ resultsPercentiles.evaluate(90.0),
+ resultsPercentiles.evaluate(95.0),
+ resultsPercentiles.evaluate(99.0),
+ average,
+ deviation
+ );
+ }
+ }
+
+ public static void main( String[] args )
+ {
+ final Benchmark testApp = new Benchmark();
+ @SuppressWarnings("deprecation")
+ JCommander jc = new JCommander(testApp, args);
+ if (testApp.help) {
+ jc.usage();
+ System.exit(1);
+ return;
+ }
+ MyState state = new MyState();
+ if(testApp.isMariaDb){
+ state.isMariaDb = true;
+ state.driver = "org.mariadb.jdbc.Driver";
+ state.connectionUrl = "jdbc:mariadb://192.168.1.30:3306/test";
+
+ }
+ else{
+ state.isMariaDb = false;
+ state.driver = "org.apache.calcite.avatica.remote.Driver";
+ state.connectionUrl = "jdbc:avatica:remote:url=http://localhost:30000;serialization=protobuf";
+
+ }
+ //iterations
+ Map<Integer, List<Long>> results = new HashMap<>();
+ final int totalIterations = 20;
+ final int[] rows = { 1,10,100, 500, 1000};
+ for(int row: rows) {
+ System.out.println("Running for rows: "+Integer.toString(row));
+ results.put(row,new ArrayList<Long>());
+ for (int i = 0; i < totalIterations; i++) {
+ System.out.println("Running iteration: "+Integer.toString(i));
+ state.doSetup();
+ state.doWarmup(row);
+ long startTime = System.nanoTime();
+ testMethod(state);
+ long endTime = System.nanoTime();
+ results.get(row).add(endTime - startTime);
+ state.doTearDown();
+ }
+ }
+ printResults(results);
+ }
+}
<goal>single</goal>
</goals>
</execution>
+ <execution>
+ <id>node-configuration</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.onap.music.mdbc.tools.CreateNodeConfigurations</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <finalName>create-node-configuration</finalName>
+ <appendAssemblyId>false</appendAssemblyId>
+ </configuration>
+ </execution>
+ <execution>
+ <id>mdbc-server</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>org.onap.music.mdbc.MdbcServer</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <finalName>mdbc-server</finalName>
+ <appendAssemblyId>false</appendAssemblyId>
+ </configuration>
+ </execution>
</executions>
</plugin>
</plugins>
Map<String, List<String>> tableToInstruction = QueryProcessor.extractTableFromQuery(sql);
//Check ownership of keys
List<Range> queryTables = MDBCUtils.getTables(tableToInstruction);
+ if(this.partition!=null ){
+ List<Range> snapshot = this.partition.getSnapshot();
+ if(snapshot!=null){
+ queryTables.addAll(snapshot);
+ }
+ }
// filter out ranges that fall under Eventually consistent
// category as these tables do not need ownership
List<Range> scQueryTables = filterEveTables( queryTables);
- this.partition = own(scQueryTables);
- dbi.preStatementHook(sql);
+ DatabasePartition tempPartition = own(scQueryTables);
+ if(tempPartition!=null && tempPartition != partition) {
+ this.partition.updateDatabasePartition(tempPartition);
+ }
+ dbi.preStatementHook(sql);
}
}
private DatabasePartition own(List<Range> ranges) throws MDBCServiceException {
+ if(ranges==null||ranges.isEmpty()){
+ return null;
+ }
DatabasePartition newPartition = null;
OwnershipAndCheckpoint ownAndCheck = mi.getOwnAndCheck();
UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey();
try {
final OwnershipReturn ownershipReturn = mi.own(ranges, partition, ownOpId);
+ if(ownershipReturn==null){
+ return null;
+ }
Dag dag = ownershipReturn.getDag();
- DagNode node = dag.getNode(ownershipReturn.getRangeId());
- MusicRangeInformationRow row = node.getRow();
- Map<MusicRangeInformationRow, LockResult> lock = new HashMap<>();
- lock.put(row, new LockResult(row.getPartitionIndex(), ownershipReturn.getOwnerId(), true, ranges));
- ownAndCheck.checkpoint(this.mi, this.dbi, dag, ranges, lock, ownershipReturn.getOwnershipId());
- newPartition = new DatabasePartition(ownershipReturn.getRanges(), ownershipReturn.getRangeId(),
- ownershipReturn.getOwnerId());
+ if(dag!=null) {
+ DagNode node = dag.getNode(ownershipReturn.getRangeId());
+ MusicRangeInformationRow row = node.getRow();
+ Map<MusicRangeInformationRow, LockResult> lock = new HashMap<>();
+ lock.put(row, new LockResult(row.getPartitionIndex(), ownershipReturn.getOwnerId(), true, ranges));
+ ownAndCheck.checkpoint(this.mi, this.dbi, dag, ranges, lock, ownershipReturn.getOwnershipId());
+ newPartition = new DatabasePartition(ownershipReturn.getRanges(), ownershipReturn.getRangeId(),
+ ownershipReturn.getOwnerId());
+ }
}
finally{
ownAndCheck.stopOwnershipTimeoutClock(ownOpId);
this.table = table.toUpperCase();
}
- public String toString(){return table;}
+ public String toString(){return table.toUpperCase();}
/**
* Compares to Range types
transactionInfo.deleteTxProgress(connectionId);
try {
Connection conn = mdbcConnections.get(connectionId);
- if(conn!=null)
+ if(conn!=null) {
conn.close();
+ }
} catch (SQLException e) {
logger.error(EELFLoggerDelegate.errorLogger, e.getMessage(),AppMessages.UNKNOWNERROR, ErrorSeverity.CRITICAL,
ErrorTypes.GENERALSERVICEERROR);
mdbcConnections.remove(connectionId);
}
if(connectionRanges.containsKey(connectionId)){
- //TODO release lock?
+ //We relinquish all locks obtained by a given
+ relinquish(connectionRanges.get(connectionId));
connectionRanges.remove(connectionId);
}
}
//\TODO Prefetch data to system using the data ranges as guide
throw new UnsupportedOperationException("Function initialize system needs to be implemented id MdbcStateManager");
}
+
+ private void relinquish(DatabasePartition partition){
+ try {
+ musicInterface.relinquish(partition.getLockId(),partition.getMRIIndex().toString());
+ } catch (MDBCServiceException e) {
+ logger.error("Relinquish failed, would need to forcefully obtain lock later");
+ }
+
+ }
}
public String nodeName;
public String sqlDatabaseName;
- public NodeConfiguration(String tables, UUID mriIndex, String sqlDatabaseName, String node){
+ public NodeConfiguration(String tables, String eventualTables, UUID mriIndex, String sqlDatabaseName, String node){
// public DatabasePartition(List<Range> knownRanges, UUID mriIndex, String mriTable, String lockId, String musicTxDigestTable) {
partition = new DatabasePartition(toRanges(tables), mriIndex, null) ;
- eventual = new Eventual(toRanges(tables));
+ eventual = new Eventual(toRanges(eventualTables));
this.nodeName = node;
this.sqlDatabaseName = sqlDatabaseName;
}
*/
package org.onap.music.mdbc.configurations;
+import com.datastax.driver.core.ResultSet;
+import java.util.stream.Collectors;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
import org.onap.music.mdbc.MDBCUtils;
* @apiNote This function assumes that when used, there is not associated redo history in the tables to the tables that are going to be managed by this configuration file
*/
public List<NodeConfiguration> initializeAndCreateNodeConfigurations() throws MDBCServiceException {
+ logger.info("initializing the required spaces");
initInternalNamespace();
List<NodeConfiguration> nodeConfigs = new ArrayList<>();
}
for(PartitionInformation partitionInfo : partitions){
String mriTableName = partitionInfo.mriTableName;
+ checkIfMriIsEmpty(mriTableName);
//0) Create the corresponding Music Range Information table
+ MusicMixin.createMusicRangeInformationTable(musicNamespace,mriTableName);
String partitionId;
if(partitionInfo.partitionId==null || partitionInfo.partitionId.isEmpty()){
throw new MDBCServiceException("Replication factor and partition id are both empty, and this is an invalid configuration");
}
//1) Create a row in the partition info table
- //partitionId = DatabaseOperations.createPartitionInfoRow(musicNamespace,pitName,partitionInfo.replicationFactor,partitionInfo.tables,null);
-
+ partitionId = MDBCUtils.generateTimebasedUniqueKey().toString();
}
else{
partitionId = partitionInfo.partitionId;
}
//2) Create a row in the transaction information table
- UUID mriTableIndex = MDBCUtils.generateTimebasedUniqueKey();
- //3) Add owner and tit information to partition info table
- RedoRow newRedoRow = new RedoRow(mriTableName,mriTableIndex);
- //DatabaseOperations.updateRedoRow(musicNamespace,pitName,partitionId,newRedoRow,partitionInfo.owner,null);
- //4) Update ttp with the new partition
- //for(String table: partitionInfo.tables) {
- //DatabaseOperations.updateTableToPartition(musicNamespace, ttpName, table, partitionId, null);
- //}
- //5) Add it to the redo history table
- //DatabaseOperations.createRedoHistoryBeginRow(musicNamespace,rhName,newRedoRow,partitionId,null);
- //6) Create config for this node
+
+ logger.info("Creating empty row with id "+partitionId);
+ MusicMixin.createEmptyMriRow(musicNamespace,partitionInfo.mriTableName,UUID.fromString(partitionId),
+ partitionInfo.owner,null,partitionInfo.getTables(),true);
+
+ //3) Create config for this node
StringBuilder newStr = new StringBuilder();
for(Range r: partitionInfo.tables){
- newStr.append(r.toString()).append(",");
+ newStr.append(r.toString().toUpperCase()).append(",");
}
- nodeConfigs.add(new NodeConfiguration(newStr.toString(),mriTableIndex,
+
+ logger.info("Appending row to configuration "+partitionId);
+ nodeConfigs.add(new NodeConfiguration(newStr.toString(),"",UUID.fromString(partitionId),
sqlDatabaseName, partitionInfo.owner));
}
return nodeConfigs;
}
+ private void checkIfMriIsEmpty(String mriTableName) throws MDBCServiceException {
+ //First check if table exists
+ StringBuilder checkTableExistsString = new StringBuilder("SELECT table_name FROM system_schema.tables WHERE keyspace_name='")
+ .append(musicNamespace)
+ .append("';");
+ PreparedQueryObject checkTableExists = new PreparedQueryObject();
+ checkTableExists.appendQueryString(checkTableExistsString.toString());
+ final ResultSet resultSet = MusicCore.quorumGet(checkTableExists);
+ if(resultSet.isExhausted()){
+ //Table doesn't exist
+ return;
+ }
+ //If exists, check if empty
+ StringBuilder checkRowsInTableString = new StringBuilder("SELECT * FROM ")
+ .append(musicNamespace)
+ .append(".")
+ .append(mriTableName)
+ .append("';");
+ PreparedQueryObject checkRowsInTable = new PreparedQueryObject();
+ checkRowsInTable.appendQueryString(checkRowsInTableString.toString());
+ final ResultSet resultSet2 = MusicCore.quorumGet(checkTableExists);
+ if(!resultSet2.isExhausted()) {
+ throw new MDBCServiceException("When initializing the configuration of the system, the MRI should not exits "
+ + "be empty");
+ }
+ }
+
+
private void initInternalNamespace() throws MDBCServiceException {
StringBuilder createKeysTableCql = new StringBuilder("CREATE TABLE IF NOT EXISTS ")
.append(internalNamespace)
{
"tables": [
{
- "table": "table11"
+ "table": "PERSONS"
}
],
"owner": "",
private String musicRangeInformationTableName = "musicrangeinformation";
private String musicRangeDependencyTableName = "musicrangedependency";
- private EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicMixin.class);
+ private static EELFLoggerDelegate logger = EELFLoggerDelegate.getLogger(MusicMixin.class);
try {
createMusicTxDigest();//\TODO If we start partitioning the data base, we would need to use the redotable number
createMusicEventualTxDigest();
- createMusicRangeInformationTable();
+ createMusicRangeInformationTable(this.music_ns,this.musicRangeInformationTableName);
createMusicRangeDependencyTable();
}
catch(MDBCServiceException e){
}
}
-
/**
* Writes the transaction information to metric's txDigest and musicRangeInformation table
* This officially commits the transaction globally
*/
@Override
- public void commitLog(DatabasePartition partition, List<Range> eventualRanges, HashMap<Range,StagingTable> transactionDigest, String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
-
+ public void commitLog(DatabasePartition partition,List<Range> eventualRanges, HashMap<Range,StagingTable> transactionDigest,
+ String txId ,TxCommitProgress progressKeeper) throws MDBCServiceException{
+ if(partition==null){
+ logger.warn("Trying tcommit log with null partition");
+ return;
+ }
+ List<Range> snapshot = partition.getSnapshot();
+ if(snapshot==null || snapshot.isEmpty()){
+ logger.warn("Trying to commit log with empty ranges");
+ return;
+ }
// first deal with commit for eventually consistent tables
filterAndAddEventualTxDigest(eventualRanges, transactionDigest, txId, progressKeeper);
-
- // if strong consistency tables are not present in the transaction then return
- if(partition == null || partition.getMRIIndex() == null)
- return;
-
+
UUID mriIndex = partition.getMRIIndex();
String fullyQualifiedMriKey = music_ns+"."+ this.musicRangeInformationTableName+"."+mriIndex;
//0. See if reference to lock was already created
}
String serializedTransactionDigest;
- try {
- serializedTransactionDigest = MDBCUtils.toString(transactionDigest);
- } catch (IOException e) {
- throw new MDBCServiceException("Failed to serialized transaction digest with error "+e.toString(), e);
- }
- MusicTxDigestId digestId = new MusicTxDigestId(commitId,-1);
- addTxDigest(digestId, serializedTransactionDigest);
- //2. Save RRT index to RQ
- if(progressKeeper!= null) {
- progressKeeper.setRecordId(txId,digestId);
+ if(!transactionDigest.isEmpty()) {
+ try {
+ serializedTransactionDigest = MDBCUtils.toString(transactionDigest);
+ } catch (IOException e) {
+ throw new MDBCServiceException("Failed to serialized transaction digest with error " + e.toString(), e);
+ }
+ MusicTxDigestId digestId = new MusicTxDigestId(commitId, -1);
+ addTxDigest(digestId, serializedTransactionDigest);
+ //2. Save RRT index to RQ
+ if (progressKeeper != null) {
+ progressKeeper.setRecordId(txId, digestId);
+ }
+ //3. Append RRT index into the corresponding TIT row array
+ appendToRedoLog(partition, digestId);
}
- //3. Append RRT index into the corresponding TIT row array
- appendToRedoLog(partition,digestId);
}
private void filterAndAddEventualTxDigest(List<Range> eventualRanges,
* * Redo: list of uiids associated to the Redo Records Table
*
*/
- private void createMusicRangeInformationTable() throws MDBCServiceException {
- String tableName = this.musicRangeInformationTableName;
+ public static void createMusicRangeInformationTable(String namespace, String tableName) throws MDBCServiceException {
String priKey = "rangeid";
StringBuilder fields = new StringBuilder();
fields.append("rangeid uuid, ");
//TODO: Frozen is only needed for old versions of cassandra, please update correspondingly
fields.append("txredolog list<frozen<tuple<text,uuid>>> ");
String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (%s, PRIMARY KEY (%s));",
- this.music_ns, tableName, fields, priKey);
+ namespace, tableName, fields, priKey);
try {
- executeMusicWriteQuery(this.music_ns,tableName,cql);
+ executeMusicWriteQuery(namespace,tableName,cql);
} catch (MDBCServiceException e) {
logger.error("Initialization error: Failure to create transaction information table");
throw(e);
if(lockId == null || lockId.isEmpty()){
throw new MDBCServiceException("Error initializing music range information, error creating a lock for a new row") ;
}
- createEmptyMriRow(newPartition.getMRIIndex(),info.getMetricProcessId(),lockId,
- newPartition.getSnapshot(),info.getIsLatest());
+ logger.info("Creating MRI " + newPartition.getMRIIndex() + " for ranges " + newPartition.getSnapshot());
+ createEmptyMriRow(this.music_ns,this.musicRangeInformationTableName,newPartition.getMRIIndex(),info.getMetricProcessId(),
+ lockId, newPartition.getSnapshot(),info.getIsLatest());
info.setOwnerId(lockId);
return newPartition;
}
private UUID createEmptyMriRow(String processId, String lockId, List<Range> ranges)
throws MDBCServiceException {
UUID id = MDBCUtils.generateTimebasedUniqueKey();
- return createEmptyMriRow(id,processId,lockId,ranges,true);
-
+ logger.info("Creating MRI "+ id + " for ranges " + ranges);
+ return createEmptyMriRow(this.music_ns,this.musicRangeInformationTableName,id,processId,lockId,ranges,true);
}
/**
* @param processId id of the process that is going to own initially this.
* @return uuid associated to the new row
*/
- private UUID createEmptyMriRow(UUID id, String processId, String lockId, List<Range> ranges, boolean isLatest)
+ public static UUID createEmptyMriRow(String musicNamespace, String mriTableName, UUID id, String processId,
+ String lockId, List<Range> ranges, boolean isLatest)
throws MDBCServiceException{
- logger.info("Creating MRI " + id + " for ranges " + ranges);
StringBuilder insert = new StringBuilder("INSERT INTO ")
- .append(this.music_ns)
+ .append(musicNamespace)
.append('.')
- .append(this.musicRangeInformationTableName)
+ .append(mriTableName)
.append(" (rangeid,keys,ownerid,islatest,metricprocessid,txredolog) VALUES ")
.append("(")
.append(id)
PreparedQueryObject query = new PreparedQueryObject();
query.appendQueryString(insert.toString());
try {
- executeMusicLockedPut(this.music_ns,this.musicRangeInformationTableName,id.toString(),query,lockId,null);
+ executeMusicLockedPut(musicNamespace,mriTableName,id.toString(),query,lockId,null);
} catch (MDBCServiceException e) {
- logger.error("Initialization error: Failure to add new row to transaction information");
throw new MDBCServiceException("Initialization error:Failure to add new row to transaction information", e);
}
return id;
@Override
public void appendToRedoLog(DatabasePartition partition, MusicTxDigestId newRecord) throws MDBCServiceException {
logger.info("Appending to redo log for partition " + partition.getMRIIndex() + " txId=" + newRecord.txId);
- PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, partition.getMRIIndex(), musicTxDigestTableName, newRecord.txId);
- ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, partition.getMRIIndex().toString(), appendQuery, partition.getLockId(), null);
+ PreparedQueryObject appendQuery = createAppendMtxdIndexToMriQuery(musicRangeInformationTableName, partition.getMRIIndex(),
+ musicTxDigestTableName, newRecord.txId);
+ ReturnType returnType = MusicCore.criticalPut(music_ns, musicRangeInformationTableName, partition.getMRIIndex().toString(),
+ appendQuery, partition.getLockId(), null);
if(returnType.getResult().compareTo(ResultType.SUCCESS) != 0 ){
logger.error(EELFLoggerDelegate.errorLogger, "Error when executing append operation with return type: "+returnType.getMessage());
throw new MDBCServiceException("Error when executing append operation with return type: "+returnType.getMessage());
String cql = String.format("SELECT * FROM %s.%s ;", music_ns, this.musicEventualTxDigestTableName);
// Ex 1 & 2 might return millions of records!! things to consider outOfMemory issue, performance issue etc.. How to overcome??
// Ex 3: will return less records compare to Ex:1 and Ex:2.
- PreparedQueryObject pQueryObject = new PreparedQueryObject();
- pQueryObject.appendQueryString(cql);
// I need to get a ResultSet of all the records and give each row to the below HashMap.
- ResultSet rs = executeMusicRead(pQueryObject.toString());
+ ResultSet rs = executeMusicRead(cql);
while (!rs.isExhausted()) {
Row row = rs.one();
String digest = row.getString("transactiondigest");
private List<Range> lockRow(LockRequest request, DatabasePartition partition,Map<UUID, LockResult> rowLock)
throws MDBCServiceException {
- if(partition.getMRIIndex()==request.id && partition.isLocked()){
+ if(partition.getMRIIndex().equals(request.id) && partition.isLocked()){
return new ArrayList<>();
}
//\TODO: this function needs to be improved, to track possible changes in the owner of a set of ranges
}
private void unlockKeyInMusic(String table, String key, String lockref) {
- String fullyQualifiedKey= music_ns+"."+ table+"."+lockref;
- MusicCore.destroyLockRef(fullyQualifiedKey,lockref);
+ String fullyQualifiedKey= music_ns+"."+ table+"."+lockref;
+ MusicCore.destroyLockRef(fullyQualifiedKey,lockref);
}
private void releaseLocks(Map<UUID,LockResult> newLocks) throws MDBCServiceException{
if(ranges == null || ranges.isEmpty())
return null;
-
+
Map<UUID,LockResult> newLocks = new HashMap<>();
//Init timeout clock
ownAndCheck.startOwnershipTimeoutClock(opId);
+ if(partition.isLocked()&&partition.getSnapshot().containsAll(ranges)) {
+ return new OwnershipReturn(opId,partition.getLockId(),partition.getMRIIndex(),partition.getSnapshot(),null);
+ }
//Find
List<Range> extendedRanges = getExtendedRanges(ranges);
List<MusicRangeInformationRow> allMriRows = getAllMriRows();
DagNode node = dag.nextToOwn();
MusicRangeInformationRow row = node.getRow();
UUID uuid = row.getPartitionIndex();
- if(partition.isLocked()&&partition.getMRIIndex()==uuid||
+ if(partition.isLocked()&&partition.getMRIIndex().equals(uuid)||
newLocks.containsKey(uuid) ||
!row.getIsLatest()){
dag.setOwn(node);
}
final LockResult lockResult = lock.get(newUUID);
partition.updateDatabasePartition(newPartition);
- createEmptyMriRow(partition.getMRIIndex(),myId,lockResult.getOwnerId(),partition.getSnapshot(),true);
+ logger.info("Creating MRI " +partition.getMRIIndex()+ " for ranges " + partition.getSnapshot());
+ createEmptyMriRow(this.music_ns,this.musicRangeInformationTableName,partition.getMRIIndex(),myId,
+ lockResult.getOwnerId(),partition.getSnapshot(),true);
return oldIds;
}
@Override
public void relinquish(String ownerId, String rangeId) throws MDBCServiceException{
+ if(ownerId==null||ownerId.isEmpty()||rangeId==null||rangeId.isEmpty()){
+ return;
+ }
String fullyQualifiedMriKey = music_ns+"."+ musicRangeInformationTableName+"."+rangeId;
try {
MusicCore.voluntaryReleaseLock(fullyQualifiedMriKey,ownerId);
private static Row executeMusicUnlockedQuorumGet(PreparedQueryObject cqlObject)
throws MDBCServiceException{
ResultSet result = MusicCore.quorumGet(cqlObject);
- if(result.isExhausted()){
+ if(result == null || result.isExhausted()){
throw new MDBCServiceException("There is not a row that matches the query: ["+cqlObject.getQuery()+"]");
}
return result.one();
}
- private void executeMusicLockedPut(String namespace, String tableName,
+ private static void executeMusicLockedPut(String namespace, String tableName,
String primaryKeyWithoutDomain, PreparedQueryObject queryObject, String lockId,
Condition conditionInfo) throws MDBCServiceException {
ReturnType rt ;
@Override
public void replayTransaction(HashMap<Range,StagingTable> digest) throws MDBCServiceException{
+ //\TODO: implement logic to move data from digests to Music Data Tables
//throw new NotImplementedException("Error, replay transaction in music mixin needs to be implemented");
+ return;
}
@Override
}
finally {
try {
- rs.close();
+ if(rs!=null) {
+ rs.close();
+ }
} catch (SQLException e) {
//continue
}
*/
package org.onap.music.mdbc.tools;
+import org.onap.music.datastore.MusicDataStore;
+import org.onap.music.datastore.MusicDataStoreHandle;
import org.onap.music.exceptions.MDBCServiceException;
import org.onap.music.logging.EELFLoggerDelegate;
+import org.onap.music.main.MusicUtil;
import org.onap.music.mdbc.configurations.NodeConfiguration;
import org.onap.music.mdbc.configurations.TablesConfiguration;
import com.beust.jcommander.JCommander;
public void readInput(){
+ LOG.info("Reading inputs");
try {
inputConfig = TablesConfiguration.readJsonFromFile(tableConfigurationsFile);
} catch (FileNotFoundException e) {
}
public static void main(String[] args) {
+ LOG.info("Starting create node configuration executor");
+ LOG.info("Using music file configuration:"+MusicUtil.getMusicPropertiesFilePath());
CreateNodeConfigurations configs = new CreateNodeConfigurations();
@SuppressWarnings("deprecation")
JCommander jc = new JCommander(configs, args);
@Parameter(names = { "-t", "--tables" }, required = true,
description = "This is the tables that are assigned to this ")
private String tables;
+ @Parameter(names = { "-e", "--tables" }, required = true,
+ description = "This is the tables that are assigned to this ")
+ private String eventual;
@Parameter(names = { "-f", "--file" }, required = true,
description = "This is the output file that is going to have the configuration for the ranges")
private String file;
}
public void convert(){
- config = new NodeConfiguration(tables, UUID.fromString(mriIndex),"test","");
+ config = new NodeConfiguration(tables, eventual,UUID.fromString(mriIndex),"test","");
}
public void saveToFile(){
<!--<jmxConfigurator /> -->
<!-- specify the component name -->
+ <property name="catalina.home" value="/var/log/metric/" />
<property name="componentName" value="mdbc"></property>
<!-- specify the base path of the log directory -->
cassandra.host =\
- localhost
+ 143.215.128.49
cassandra.user =\
- cassandra
+ metric
cassandra.password =\
- cassandra
+ metriccluster
zookeeper.host =\
localhost
break;
case "all.public.ips":
String[] ips = prop.getProperty(key).split(":");
- if (ips.length == 1) {
+ if (ips.length== 1) {
// Future use
} else if (ips.length > 1) {
MusicUtil.setAllPublicIps(
@Test(timeout=1000)
public void own() {
- Range range = new Range("table1");
+ Range range = new Range("TABLE1");
List<Range> ranges = new ArrayList<>();
ranges.add(range);
final DatabasePartition partition = TestUtils.createBasicRow(range, mixin, mdbcServerName);
@Test(timeout=1000)
public void own2() throws InterruptedException, MDBCServiceException {
List<Range> range12 = new ArrayList<>( Arrays.asList(
- new Range("range1"),
- new Range("range2")
+ new Range("RANGE1"),
+ new Range("RANGE2")
));
List<Range> range34 = new ArrayList<>( Arrays.asList(
- new Range("range3"),
- new Range("range4")
+ new Range("RANGE3"),
+ new Range("RANGE4")
));
List<Range> range24 = new ArrayList<>( Arrays.asList(
- new Range("range2"),
- new Range("range4")
+ new Range("RANGE2"),
+ new Range("RANGE4")
));
List<Range> range123 = new ArrayList<>( Arrays.asList(
- new Range("range1"),
- new Range("range2"),
- new Range("range3")
+ new Range("RANGE1"),
+ new Range("RANGE2"),
+ new Range("RANGE3")
));
DatabasePartition db1 = addRow(range12, false);
DatabasePartition db2 = addRow(range34, false);
DagNode missing = outgoingEdges.get(0);
Set<Range> missingRanges = missing.getRangeSet();
assertEquals(2,missingRanges.size());
- assertTrue(missingRanges.contains(new Range("range1")));
- assertTrue(missingRanges.contains(new Range("range3")));
+ assertTrue(missingRanges.contains(new Range("RANGE1")));
+ assertTrue(missingRanges.contains(new Range("RANGE3")));
List<DagNode> outgoingEdges1 = missing.getOutgoingEdges();
assertEquals(1,outgoingEdges1.size());
assertFalse(finalNode.hasNotIncomingEdges());
Set<Range> finalSet = finalNode.getRangeSet();
assertEquals(3,finalSet.size());
- assertTrue(finalSet.contains(new Range("range1")));
- assertTrue(finalSet.contains(new Range("range2")));
- assertTrue(finalSet.contains(new Range("range3")));
+ assertTrue(finalSet.contains(new Range("RANGE1")));
+ assertTrue(finalSet.contains(new Range("RANGE2")));
+ assertTrue(finalSet.contains(new Range("RANGE3")));
DagNode node5 = dag.getNode(db5.getMRIIndex());
List<DagNode> toRemoveOutEdges = node5.getOutgoingEdges();
OwnershipReturn own = cleanAndOwnPartition(ranges,ownOpId);
Map<MusicRangeInformationRow, LockResult> locks = new HashMap<>();
- locks.put(own.getDag().getNode(own.getRangeId()).getRow(),new LockResult(own.getRangeId(),own.getOwnerId(),true,
- ranges));
- ownAndCheck.checkpoint(musicMixin,mysqlMixin,own.getDag(),ranges,locks, ownOpId);
+ if(own.getDag()!=null) {
+ locks.put(own.getDag().getNode(own.getRangeId()).getRow(),
+ new LockResult(own.getRangeId(), own.getOwnerId(), true,
+ ranges));
+ ownAndCheck.checkpoint(musicMixin, mysqlMixin, own.getDag(), ranges, locks, ownOpId);
+ }
checkData();
}
UUID ownOpId = MDBCUtils.generateTimebasedUniqueKey();
OwnershipReturn own = cleanAndOwnPartition(ranges,ownOpId);
+
Map<MusicRangeInformationRow, LockResult> locks = new HashMap<>();
- locks.put(own.getDag().getNode(own.getRangeId()).getRow(),new LockResult(own.getRangeId(),own.getOwnerId(),true,
- ranges));
+ if(own.getDag()!=null) {
+ locks.put(own.getDag().getNode(own.getRangeId()).getRow(),
+ new LockResult(own.getRangeId(), own.getOwnerId(), true,
+ ranges));
+ }
ownAndCheck.warmup(musicMixin,mysqlMixin,ranges);
checkData();
<modules>
<module>mdbc-server</module>
<module>mdbc-packages</module>
+ <module>mdbc-benchmark</module>
</modules>
<dependencies>
<artifactId>netty-handler</artifactId>
<version>4.1.30.Final</version>
</dependency>
+ <dependency>
+ <groupId>com.beust</groupId>
+ <artifactId>jcommander</artifactId>
+ <version>1.72</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ <version>3.0</version>
+ </dependency>
</dependencies>
<build>