Fix Eventual consistency 58/78158/2
authorst782s <statta@research.att.com>
Fri, 8 Feb 2019 22:30:33 +0000 (17:30 -0500)
committerSunder Tattavarada <statta@research.att.com>
Mon, 11 Feb 2019 12:10:06 +0000 (12:10 +0000)
Issue-ID: MUSIC-319
Change-Id: I7433e7d71c44ffe623e560863e7e1fd9c0f247ce
Signed-off-by: st782s <statta@research.att.com>
mdbc-server/src/main/java/org/onap/music/mdbc/examples/MdbcTestClient.java [changed mode: 0755->0644]
mdbc-server/src/main/java/org/onap/music/mdbc/mixins/MusicMixin.java
mdbc-server/src/main/resources/music.properties

old mode 100755 (executable)
new mode 100644 (file)
index 35293ef..6a17d4c
-/*
- * ============LICENSE_START====================================================
- * org.onap.music.mdbc
- * =============================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
- * =============================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END======================================================
- */
-package org.onap.music.mdbc.examples;
-
-import java.sql.*;
-import org.apache.calcite.avatica.remote.Driver;
-
-public class MdbcTestClient {
-
-    public static class Hr {
-        public final Employee[] emps = {
-                new Employee(100, "Bill"),
-                new Employee(200, "Eric"),
-                new Employee(150, "Sebastian"),
-        };
-    }
-
-    public static class Employee {
-        public final int empid;
-        public final String name;
-
-        public Employee(int empid, String name) {
-            this.empid = empid;
-            this.name = name;
-        }
-    }
-
-    public static void main(String[] args){
-        try {
-            Class.forName("org.apache.calcite.avatica.remote.Driver");
-        } catch (ClassNotFoundException e) {
-            e.printStackTrace();
-            System.exit(1);
-        }
-        Connection connection;
-        try {
-            connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf");
-        } catch (SQLException e) {
-            e.printStackTrace();
-            return;
-        }
-
-        try {
-        connection.setAutoCommit(false);
-        } catch (SQLException e) {
-            e.printStackTrace();
-            return;
-        }
-
-
-        final String sql = "CREATE TABLE IF NOT EXISTS Persons (\n" +
-                "    PersonID int,\n" +
-                "    LastName varchar(255),\n" +
-                "    FirstName varchar(255),\n" +
-                "    Address varchar(255),\n" +
-                "    City varchar(255),\n" +
-                "    PRIMARY KEY (PersonID,LastName)" +
-                ");";
-        Statement stmt;
-        try {
-            stmt = connection.createStatement();
-        } catch (SQLException e) {
-            e.printStackTrace();
-            return;
-        }
-
-        boolean execute;
-        try {
-            execute = stmt.execute(sql);
-        } catch (SQLException e) {
-            e.printStackTrace();
-            return;
-        }
-
-        if (execute) {
-            try {
-                connection.commit();
-            } catch (SQLException e) {
-                e.printStackTrace();
-            }
-        }
-
-        try {
-            stmt.close();
-        } catch (SQLException e) {
-            e.printStackTrace();
-        }
-
-        final String insertSQL = "INSERT INTO Persons VALUES (1, 'Martinez', 'Juan', 'KACB', 'ATLANTA');";
-        final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=1;";
-        final String insertSQL2 = "INSERT INTO Persons VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');";
-        final String insertSQL3 = "UPDATE Persons SET FirstName='JOSH' WHERE LastName='Smith';";
-        final String insertSQL4 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';";
-
-
-        Statement insertStmt;
-        try {
-            insertStmt = connection.createStatement();
-        } catch (SQLException e) {
-            e.printStackTrace();
-            return;
-        }
-
-        try {
-            execute = insertStmt.execute(insertSQL);
-            execute = insertStmt.execute(insertSQL1);
-            execute = insertStmt.execute(insertSQL2);
-            execute = insertStmt.execute(insertSQL3);
-            execute = insertStmt.execute(insertSQL4);
-
-        } catch (SQLException e) {
-            e.printStackTrace();
-            return;
-        }
-
-        try {
-            connection.commit();
-        } catch (SQLException e) {
-            e.printStackTrace();
-            return;
-        }
-
-        try {
-            stmt.close();
-            insertStmt.close();
-        } catch (SQLException e) {
-            e.printStackTrace();
-        }
-
-        try {
-            connection.close();
-        } catch (SQLException e) {
-            e.printStackTrace();
-        }
-
-
-    }
-}
+/*\r
+ * ============LICENSE_START====================================================\r
+ * org.onap.music.mdbc\r
+ * =============================================================================\r
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.\r
+ * =============================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ * \r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * \r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END======================================================\r
+ */\r
+package org.onap.music.mdbc.examples;\r
+\r
+import java.sql.*;\r
+import org.apache.calcite.avatica.remote.Driver;\r
+\r
+public class MdbcTestClient {\r
+\r
+    public static class Hr {\r
+        public final Employee[] emps = {\r
+                new Employee(100, "Bill"),\r
+                new Employee(200, "Eric"),\r
+                new Employee(150, "Sebastian"),\r
+        };\r
+    }\r
+\r
+    public static class Employee {\r
+        public final int empid;\r
+        public final String name;\r
+\r
+        public Employee(int empid, String name) {\r
+            this.empid = empid;\r
+            this.name = name;\r
+        }\r
+    }\r
+\r
+    public static void main(String[] args){\r
+        try {\r
+            Class.forName("org.apache.calcite.avatica.remote.Driver");\r
+        } catch (ClassNotFoundException e) {\r
+            e.printStackTrace();\r
+            System.exit(1);\r
+        }\r
+        Connection connection;\r
+        try {\r
+            connection = DriverManager.getConnection("jdbc:avatica:remote:url=http://localhost:30000/test;serialization=protobuf");\r
+        } catch (SQLException e) {\r
+            e.printStackTrace();\r
+            return;\r
+        }\r
+\r
+        try {\r
+        connection.setAutoCommit(false);\r
+        } catch (SQLException e) {\r
+            e.printStackTrace();\r
+            return;\r
+        }\r
+\r
+\r
+        final String sql = "CREATE TABLE IF NOT EXISTS Persons (\n" +\r
+                "    PersonID int,\n" +\r
+                "    LastName varchar(255),\n" +\r
+                "    FirstName varchar(255),\n" +\r
+                "    Address varchar(255),\n" +\r
+                "    City varchar(255),\n" +\r
+                "    PRIMARY KEY (PersonID,LastName)" +\r
+                ");";\r
+        Statement stmt;\r
+        try {\r
+            stmt = connection.createStatement();\r
+        } catch (SQLException e) {\r
+            e.printStackTrace();\r
+            return;\r
+        }\r
+\r
+        boolean execute = true;\r
+//        try {\r
+//            execute = stmt.execute(sql);\r
+//        } catch (SQLException e) {\r
+//            e.printStackTrace();\r
+//            return;\r
+//        }\r
+\r
+        if (execute) {\r
+            try {\r
+                connection.commit();\r
+            } catch (SQLException e) {\r
+                e.printStackTrace();\r
+            }\r
+        }\r
+\r
+        try {\r
+            stmt.close();\r
+        } catch (SQLException e) {\r
+            e.printStackTrace();\r
+        }\r
+\r
+        final String insertSQL = "INSERT INTO Persons VALUES (1, 'Martinez', 'Juan', 'KACB', 'ATLANTA');";\r
+        final String insertSQL1 = "DELETE FROM Persons WHERE PersonID=1;";\r
+        final String insertSQL2 = "INSERT INTO Persons VALUES (2, 'Smith', 'JOHN', 'GNOC', 'BEDMINSTER');";\r
+        final String insertSQL3 = "UPDATE Persons SET FirstName='JOSH' WHERE LastName='Smith';";\r
+        final String insertSQL4 = "UPDATE Persons SET FirstName='JOHN' WHERE LastName='Smith';";\r
+\r
+\r
+        Statement insertStmt;\r
+        try {\r
+            insertStmt = connection.createStatement();\r
+        } catch (SQLException e) {\r
+            e.printStackTrace();\r
+            return;\r
+        }\r
+\r
+        try {\r
+            execute = insertStmt.execute(insertSQL);\r
+            execute = insertStmt.execute(insertSQL1);\r
+            execute = insertStmt.execute(insertSQL2);\r
+            execute = insertStmt.execute(insertSQL3);\r
+            execute = insertStmt.execute(insertSQL4);\r
+\r
+        } catch (SQLException e) {\r
+            e.printStackTrace();\r
+            return;\r
+        }\r
+\r
+        try {\r
+            connection.commit();\r
+        } catch (SQLException e) {\r
+            e.printStackTrace();\r
+            return;\r
+        }\r
+\r
+        try {\r
+            stmt.close();\r
+            insertStmt.close();\r
+        } catch (SQLException e) {\r
+            e.printStackTrace();\r
+        }\r
+\r
+        try {\r
+            connection.close();\r
+        } catch (SQLException e) {\r
+            e.printStackTrace();\r
+        }\r
+\r
+\r
+    }\r
+}\r
index 963647c..0210cd1 100644 (file)
@@ -998,7 +998,7 @@ public class MusicMixin implements MusicInterface {
      * @return a ResultSet containing the rows returned from the query
      */
     protected ResultSet executeMusicRead(String cql) throws MDBCServiceException {
-        logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC write:"+ cql);
+        logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC read:"+ cql);
         PreparedQueryObject pQueryObject = new PreparedQueryObject();
         pQueryObject.appendQueryString(cql);
         ResultSet results = null;
@@ -1010,6 +1010,23 @@ public class MusicMixin implements MusicInterface {
         }
         return results;
     }
+    
+    /**
+     * This method executes a read query in Music
+     * @param pQueryObject the PreparedQueryObject to be sent to Cassandra
+     * @return a ResultSet containing the rows returned from the query
+     */
+    protected ResultSet executeMusicRead(PreparedQueryObject pQueryObject) throws MDBCServiceException {
+        logger.debug(EELFLoggerDelegate.applicationLogger, "Executing MUSIC read:"+ pQueryObject.getQuery());
+        ResultSet results = null;
+        try {
+            results = MusicCore.get(pQueryObject);
+        } catch (MusicServiceException e) {
+            logger.error("Error executing music get operation for query: ["+pQueryObject.getQuery()+"]");
+            throw new MDBCServiceException("Error executing get: "+e.getMessage(), e);
+        }
+        return results;
+    }
 
     /**
      * Returns the default primary key name that this mixin uses
@@ -1924,23 +1941,23 @@ public class MusicMixin implements MusicInterface {
         HashMap<Range,StagingTable> changes;
         String cql;
         LinkedHashMap<UUID, HashMap<Range,StagingTable>> ecDigestInformation = new LinkedHashMap<UUID, HashMap<Range,StagingTable>>();
-        String musicevetxdigestNodeinfoTimeID = getTxTimeIdFromNodeInfo(nodeName);
+        UUID musicevetxdigestNodeinfoTimeID = getTxTimeIdFromNodeInfo(nodeName);
         PreparedQueryObject pQueryObject = new PreparedQueryObject();
         
-        if (musicevetxdigestNodeinfoTimeID != null && !musicevetxdigestNodeinfoTimeID.isEmpty() ) {
+        if (musicevetxdigestNodeinfoTimeID != null) {
             // this will fetch only few records based on the time-stamp condition.
-            cql = String.format("SELECT * FROM %s.%s WHERE txtimeid > ?;", music_ns, this.musicEventualTxDigestTableName);
+            cql = String.format("SELECT * FROM %s.%s WHERE txtimeid > ? LIMIT 10 ALLOW FILTERING;", music_ns, this.musicEventualTxDigestTableName);
             pQueryObject.appendQueryString(cql);
             pQueryObject.addValue(musicevetxdigestNodeinfoTimeID);
             
         } else {
             // This is going to Fetch all the Transactiondigest records from the musicevetxdigest table.
-            cql = String.format("SELECT * FROM %s.%s ;", music_ns, this.musicEventualTxDigestTableName);
+            cql = String.format("SELECT * FROM %s.%s LIMIT 10;", music_ns, this.musicEventualTxDigestTableName);
             pQueryObject.appendQueryString(cql);
         }
         
         // I need to get a ResultSet of all the records and give each row to the below HashMap.
-        ResultSet rs = executeMusicRead(pQueryObject.getQuery());
+        ResultSet rs = executeMusicRead(pQueryObject);
         while (!rs.isExhausted()) {
             Row row = rs.one();
             String digest = row.getString("transactiondigest");        
@@ -2561,13 +2578,16 @@ public class MusicMixin implements MusicInterface {
     @Override
     public void updateNodeInfoTableWithTxTimeIDKey(UUID txTimeID, String nodeName) throws MDBCServiceException{
         
-           String cql = String.format("UPDATE %s.%s SET txtimeid = (%s), txupdatedatetime = now() WHERE nodename = ?;", music_ns, this.musicEventualTxDigestTableName, txTimeID);
+           String cql = String.format("UPDATE %s.%s SET txtimeid = %s, txupdatedatetime = now() WHERE nodename = ?;", music_ns, this.musicNodeInfoTableName, txTimeID);
             PreparedQueryObject pQueryObject = new PreparedQueryObject();
             pQueryObject.appendQueryString(cql);
             pQueryObject.addValue(nodeName);
-        
-            executeMusicWriteQuery(pQueryObject.getQuery());
-            logger.info("Successfully updated nodeinfo table with txtimeid value: " + txTimeID + " against the node:" + nodeName);
+            
+            ReturnType rt = MusicCore.eventualPut(pQueryObject);
+            if(rt.getResult().getResult().toLowerCase().equals("failure")) {
+                logger.error(EELFLoggerDelegate.errorLogger, "Failure while eventualPut...: "+rt.getMessage());
+            }
+            else logger.info("Successfully updated nodeinfo table with txtimeid value: " + txTimeID + " against the node:" + nodeName);
             
         
     }
@@ -2618,7 +2638,7 @@ public class MusicMixin implements MusicInterface {
         }
     }
     
-    public String getTxTimeIdFromNodeInfo(String nodeName) throws MDBCServiceException {
+    public UUID getTxTimeIdFromNodeInfo(String nodeName) throws MDBCServiceException {
             // expecting NodeName from base-0.json file: which is : NJNode
             //String nodeName = MdbcServer.stateManager.getMdbcServerName(); 
             // this retrieves the NJNode row from Cassandra's NodeInfo table so that I can retrieve TimeStamp for further processing.
@@ -2632,13 +2652,12 @@ public class MusicMixin implements MusicInterface {
         } catch (MDBCServiceException e) {
             logger.error("Get operation error: Failure to get row from nodeinfo with nodename:"+nodeName);
             // TODO check underlying exception if no data and return empty string
-            return "";
+            return null;
             //throw new MDBCServiceException("error:Failure to retrive nodeinfo details information", e);
         }
         
-        String txtimeid = newRow.getString("txtimeid");
+        return newRow.getUUID("txtimeid");
 
-        return txtimeid;
     }
 
 
index 21f3e92..83dcb7c 100755 (executable)
@@ -1,8 +1,8 @@
 cassandra.host =\
-  192.168.1.19
+  localhost
 cassandra.user =\
-  metric
+  cassandra
 cassandra.password =\
-  metriccluster
+  cassandra
 zookeeper.host =\
   localhost