Refactor Prov DB handling
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / utils / LogfileLoader.java
index b48907f..18ffea0 100644 (file)
@@ -47,6 +47,7 @@ import java.util.Map;
 import java.util.TreeSet;
 import java.util.zip.GZIPInputStream;
 import org.onap.dmaap.datarouter.provisioning.BaseServlet;
+import org.onap.dmaap.datarouter.provisioning.ProvRunner;
 import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord;
 import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord;
 import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord;
@@ -79,17 +80,12 @@ public class LogfileLoader extends Thread {
      */
     private static LogfileLoader logfileLoader;
 
-    /**
-     * The PreparedStatement which is loaded by a <i>Loadable</i>.
-     */
-    private static final String INSERT_SQL = "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
     /**
      * Each server can assign this many IDs.
      */
     private static final long SET_SIZE = (1L << 56);
 
     private final EELFLogger logger;
-    private final DB db;
     private final String spooldir;
     private final long setStart;
     private final long setEnd;
@@ -99,8 +95,7 @@ public class LogfileLoader extends Thread {
 
     private LogfileLoader() {
         this.logger = EELFManager.getInstance().getLogger("InternalLog");
-        this.db = new DB();
-        this.spooldir = db.getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
+        this.spooldir = ProvRunner.getProvProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
         this.setStart = getIdRange();
         this.setEnd = setStart + SET_SIZE - 1;
         this.seqSet = new RLEBitSet();
@@ -126,17 +121,17 @@ public class LogfileLoader extends Thread {
     }
 
     private long getIdRange() {
-        long n;
+        long size;
         if (BaseServlet.isInitialActivePOD()) {
-            n = 0;
+            size = 0;
         } else if (BaseServlet.isInitialStandbyPOD()) {
-            n = SET_SIZE;
+            size = SET_SIZE;
         } else {
-            n = SET_SIZE * 2;
+            size = SET_SIZE * 2;
         }
-        String r = String.format("[%X .. %X]", n, n + SET_SIZE - 1);
-        logger.debug("This server shall assign RECORD_IDs in the range " + r);
-        return n;
+        String range = String.format("[%X .. %X]", size, size + SET_SIZE - 1);
+        logger.debug("This server shall assign RECORD_IDs in the range " + range);
+        return size;
     }
 
     /**
@@ -207,10 +202,10 @@ public class LogfileLoader extends Thread {
             logger.debug("PROV8001 Starting " + infile + " ...");
         }
         long time = System.currentTimeMillis();
-        int[] n = process(infile);
+        int[] array = process(infile);
         time = System.currentTimeMillis() - time;
         logger.info(String.format("PROV8000 Processed %s in %d ms; %d of %d records.",
-            infile.toString(), time, n[0], n[1]));
+            infile.toString(), time, array[0], array[1]));
         try {
             Files.delete(infile.toPath());
         } catch (IOException e) {
@@ -226,10 +221,10 @@ public class LogfileLoader extends Thread {
         Parameters provLogRetention = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);
         if (provLogRetention != null) {
             try {
-                long n = Long.parseLong(provLogRetention.getValue());
+                long retention = Long.parseLong(provLogRetention.getValue());
                 // This check is to prevent inadvertent errors from wiping the table out
-                if (n > 1000000L) {
-                    threshold = n;
+                if (retention > 1000000L) {
+                    threshold = retention;
                 }
             } catch (NumberFormatException e) {
                 // ignore
@@ -256,11 +251,10 @@ public class LogfileLoader extends Thread {
             cutoff *= 86400000L;
             logger.debug("  Pruning records older than=" + (cutoff / 86400000L) + " (" + new Date(cutoff) + ")");
 
-            Connection conn = null;
-            try {
+            try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
                 // Limit to a million at a time to avoid typing up the DB for too long.
-                conn = db.getConnection();
-                try (PreparedStatement ps = conn.prepareStatement("DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000")) {
+                try (PreparedStatement ps = conn.prepareStatement(
+                    "DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000")) {
                     ps.setLong(1, cutoff);
                     while (count > 0) {
                         if (!ps.execute()) {
@@ -281,8 +275,6 @@ public class LogfileLoader extends Thread {
                 }
             } catch (SQLException e) {
                 logger.error("LogfileLoader.pruneRecords: " + e.getMessage(), e);
-            } finally {
-                db.release(conn);
             }
         }
         return did1;
@@ -290,14 +282,11 @@ public class LogfileLoader extends Thread {
 
     private long countRecords() {
         long count = 0;
-        try (Connection conn = db.getConnection();
-            Statement stmt = conn.createStatement()) {
-            try (ResultSet rs = stmt.executeQuery("SELECT COUNT(*) as COUNT from LOG_RECORDS")) {
-                if (rs.next()) {
-                    count = rs.getLong("COUNT");
-                }
-            } finally {
-                db.release(conn);
+        try (Connection conn = ProvDbUtils.getInstance().getConnection();
+            PreparedStatement ps = conn.prepareStatement("SELECT COUNT(*) as COUNT from LOG_RECORDS");
+            ResultSet rs = ps.executeQuery()) {
+            if (rs.next()) {
+                count = rs.getLong("COUNT");
             }
         } catch (SQLException e) {
             logger.error("LogfileLoader.countRecords: " + e.getMessage(), e);
@@ -307,18 +296,16 @@ public class LogfileLoader extends Thread {
 
     private Map<Long, Long> getHistogram() {
         Map<Long, Long> map = new HashMap<>();
-        try (Connection conn = db.getConnection();
-            Statement stmt = conn.createStatement()) {
+        try (Connection conn = ProvDbUtils.getInstance().getConnection();
+            PreparedStatement ps = conn.prepareStatement(
+                "SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY");
+            ResultSet rs = ps.executeQuery()) {
             logger.debug("  LOG_RECORD table histogram...");
-            try (ResultSet rs = stmt.executeQuery("SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY")) {
-                while (rs.next()) {
-                    long day = rs.getLong("DAY");
-                    long cnt = rs.getLong("COUNT");
-                    map.put(day, cnt);
-                    logger.debug("  " + day + "  " + cnt);
-                }
-            } finally {
-                db.release(conn);
+            while (rs.next()) {
+                long day = rs.getLong("DAY");
+                long cnt = rs.getLong("COUNT");
+                map.put(day, cnt);
+                logger.debug("  " + day + "  " + cnt);
             }
         } catch (SQLException e) {
             logger.error("LogfileLoader.getHistogram: " + e.getMessage(), e);
@@ -327,9 +314,7 @@ public class LogfileLoader extends Thread {
     }
 
     private void initializeNextid() {
-        Connection conn = null;
-        try {
-            conn = db.getConnection();
+        try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
             RLEBitSet nbs = new RLEBitSet();
             try (Statement stmt = conn.createStatement()) {
                 // Build a bitset of all records in the LOG_RECORDS table
@@ -341,8 +326,8 @@ public class LogfileLoader extends Thread {
                     try (ResultSet rs = stmt.executeQuery(sql)) {
                         goAgain = false;
                         while (rs.next()) {
-                            long n = rs.getLong("RECORD_ID");
-                            nbs.set(n);
+                            long recordId = rs.getLong("RECORD_ID");
+                            nbs.set(recordId);
                             goAgain = true;
                         }
                     }
@@ -355,8 +340,8 @@ public class LogfileLoader extends Thread {
             RLEBitSet idset = new RLEBitSet();
             idset.set(setStart, setStart + SET_SIZE);
             tbs.and(idset);
-            long t = tbs.length();
-            nextId = (t == 0) ? setStart : (t - 1);
+            long bitLength = tbs.length();
+            nextId = (bitLength == 0) ? setStart : (bitLength - 1);
             if (nextId >= setStart + SET_SIZE) {
                 // Handle wraparound, when the IDs reach the end of our "range"
                 Long[] last = null;
@@ -366,29 +351,27 @@ public class LogfileLoader extends Thread {
                 }
                 if (last != null) {
                     tbs.clear(last[0], last[1] + 1);
-                    t = tbs.length();
-                    nextId = (t == 0) ? setStart : (t - 1);
+                    bitLength = tbs.length();
+                    nextId = (bitLength == 0) ? setStart : (bitLength - 1);
                 }
             }
             logger.debug(String.format("LogfileLoader.initializeNextid, next ID is %d (%x)", nextId, nextId));
         } catch (SQLException e) {
             logger.error("LogfileLoader.initializeNextid: " + e.getMessage(), e);
-        } finally {
-            db.release(conn);
         }
     }
 
     @SuppressWarnings("resource")
-    int[] process(File f) {
+    int[] process(File file) {
         int ok = 0;
         int total = 0;
-        try {
-            Connection conn = db.getConnection();
-            PreparedStatement ps = conn.prepareStatement(INSERT_SQL);
-            Reader r = f.getPath().endsWith(".gz")
-                ? new InputStreamReader(new GZIPInputStream(new FileInputStream(f)))
-                : new FileReader(f);
-            try (LineNumberReader in = new LineNumberReader(r)) {
+        try (Connection conn = ProvDbUtils.getInstance().getConnection();
+            PreparedStatement ps = conn.prepareStatement(
+                "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) {
+            Reader reader = file.getPath().endsWith(".gz")
+                ? new InputStreamReader(new GZIPInputStream(new FileInputStream(file)))
+                : new FileReader(file);
+            try (LineNumberReader in = new LineNumberReader(reader)) {
                 String line;
                 while ((line = in.readLine()) != null) {
                     try {
@@ -425,10 +408,8 @@ public class LogfileLoader extends Thread {
                     total++;
                 }
             }
-            ps.close();
-            db.release(conn);
         } catch (SQLException | IOException e) {
-            logger.warn("PROV8007 Exception reading " + f + ": " + e);
+            logger.warn("PROV8007 Exception reading " + file + ": " + e);
         }
         return new int[]{ok, total};
     }
@@ -456,11 +437,12 @@ public class LogfileLoader extends Thread {
             }
             if ("EXP".equals(rtype) && pp.length == 11) {
                 // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts
-                ExpiryRecord e = new ExpiryRecord(pp);
-                if ("other".equals(e.getReason())) {
-                    logger.info("Invalid reason '" + pp[9] + "' changed to 'other' for record: " + e.getPublishId());
+                ExpiryRecord expiryRecord = new ExpiryRecord(pp);
+                if ("other".equals(expiryRecord.getReason())) {
+                    logger.info("Invalid reason '" + pp[9] + "' changed to 'other' for record: "
+                        + expiryRecord.getPublishId());
                 }
-                return new Loadable[]{e};
+                return new Loadable[]{expiryRecord};
             }
             if ("PBF".equals(rtype) && pp.length == 12) {
                 // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error
@@ -471,7 +453,8 @@ public class LogfileLoader extends Thread {
                 return new Loadable[]{new DeliveryExtraRecord(pp)};
             }
             if ("LOG".equals(rtype) && (pp.length == 19 || pp.length == 20)) {
-                // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id
+                // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|
+                // feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id
                 return new Loadable[]{new LogRecord(pp)};
             }
         }
@@ -482,9 +465,9 @@ public class LogfileLoader extends Thread {
     /**
      * The LogfileLoader can be run stand-alone by invoking the main() method of this class.
      *
-     * @param a ignored
+     * @param str ignored
      */
-    public static void main(String[] a) throws InterruptedException {
+    public static void main(String[] str) throws InterruptedException {
         LogfileLoader.getLoader();
         Thread.sleep(200000L);
     }