X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=datarouter-prov%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdatarouter%2Fprovisioning%2Futils%2FLogfileLoader.java;h=ab1a3a78eb72d1ee39a68820d7f443600fb4cf82;hb=5da50e90e6c78700d48d7468849d4a1599a249b3;hp=b19bee10b310ad19b9e00bae14d30fb8354643b1;hpb=e4b20cc6f7c31f48ddd0de5bcd054b09a35cd510;p=dmaap%2Fdatarouter.git
diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoader.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoader.java
index b19bee10..ab1a3a78 100644
--- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoader.java
+++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoader.java
@@ -7,9 +7,9 @@
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
- * *
+ * *
* * http://www.apache.org/licenses/LICENSE-2.0
- * *
+ * *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -59,7 +59,7 @@ import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord;
/**
* This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir.
- * These logfiles are loaded into the MySQL LOG_RECORDS table. In a running provisioning server, there should only be
+ * These logfiles are loaded into the MariaDB LOG_RECORDS table. In a running provisioning server, there should only be
* two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at
* startup to load the old (1.0) style log tables into LOG_RECORDS;
*
This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the
@@ -70,479 +70,431 @@ import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord;
* @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $
*/
public class LogfileLoader extends Thread {
- /** Default number of log records to keep when pruning. Keep 10M by default. */
- public static final long DEFAULT_LOG_RETENTION = 10000000L;
- /** NOT USED: Percentage of free space required before old records are removed. */
- public static final int REQUIRED_FREE_PCT = 20;
+ /**
+ * Default number of log records to keep when pruning. Keep 10M by default.
+ */
+ public static final long DEFAULT_LOG_RETENTION = 10000000L;
+ /**
+ * NOT USED: Percentage of free space required before old records are removed.
+ */
+ public static final int REQUIRED_FREE_PCT = 20;
+
+ /**
+ * This is a singleton -- there is only one LogfileLoader object in the server
+ */
+ private static LogfileLoader p;
+
+ /**
+ * Get the singleton LogfileLoader object, and start it if it is not running.
+ *
+ * @return the LogfileLoader
+ */
+ public static synchronized LogfileLoader getLoader() {
+ if (p == null)
+ p = new LogfileLoader();
+ if (!p.isAlive())
+ p.start();
+ return p;
+ }
+
+ /**
+ * The PreparedStatement which is loaded by a Loadable.
+ */
+ public static final String INSERT_SQL = "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ /**
+ * Each server can assign this many IDs
+ */
+ private static final long SET_SIZE = (1L << 56);
+
+ private final Logger logger;
+ private final DB db;
+ private final String spooldir;
+ private final long set_start;
+ private final long set_end;
+ private RLEBitSet seq_set;
+ private long nextid;
+ private boolean idle;
+
+ private LogfileLoader() {
+ this.logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");
+ this.db = new DB();
+ this.spooldir = db.getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
+ this.set_start = getIdRange();
+ this.set_end = set_start + SET_SIZE - 1;
+ this.seq_set = new RLEBitSet();
+ this.nextid = 0;
+ this.idle = false;
+
+ // This is a potentially lengthy operation, so has been moved to run()
+ //initializeNextid();
+ this.setDaemon(true);
+ this.setName("LogfileLoader");
+ }
+
+ private long getIdRange() {
+ long n;
+ if (BaseServlet.isInitialActivePOD())
+ n = 0;
+ else if (BaseServlet.isInitialStandbyPOD())
+ n = SET_SIZE;
+ else
+ n = SET_SIZE * 2;
+ String r = String.format("[%X .. %X]", n, n + SET_SIZE - 1);
+ logger.debug("This server shall assign RECORD_IDs in the range " + r);
+ return n;
+ }
+
+ /**
+ * Return the bit set representing the record ID's that are loaded in this database.
+ *
+ * @return the bit set
+ */
+ public RLEBitSet getBitSet() {
+ return seq_set;
+ }
- /** This is a singleton -- there is only one LogfileLoader object in the server */
- private static LogfileLoader p;
+ /**
+ * True if the LogfileLoader is currently waiting for work.
+ *
+ * @return true if idle
+ */
+ public boolean isIdle() {
+ return idle;
+ }
- /**
- * Get the singleton LogfileLoader object, and start it if it is not running.
- * @return the LogfileLoader
- */
- public static synchronized LogfileLoader getLoader() {
- if (p == null)
- p = new LogfileLoader();
- if (!p.isAlive())
- p.start();
- return p;
- }
+ /**
+ * Run continuously to look for new logfiles in the spool directory and import them into the DB.
+ * The spool is checked once per second. If free space on the MariaDB filesystem falls below
+ * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS
+ * table is compacted until free space rises above the threshold.
+ */
+ @Override
+ public void run() {
+ initializeNextid(); // moved from the constructor
+ while (true) {
+ try {
+ File dirfile = new File(spooldir);
+ while (true) {
+ // process IN files
+ File[] infiles = dirfile.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.startsWith("IN.");
+ }
+ });
- /** The PreparedStatement which is loaded by a Loadable. */
- public static final String INSERT_SQL = "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
- /** Each server can assign this many IDs */
- private static final long SET_SIZE = (1L << 56);
+ if (infiles.length == 0) {
+ idle = true;
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ idle = false;
+ } else {
+ // Remove old rows
+ if (pruneRecords()) {
+ // Removed at least some entries, recompute the bit map
+ initializeNextid();
+ }
- private final Logger logger;
- private final DB db;
- private final String spooldir;
- private final long set_start;
- private final long set_end;
- private RLEBitSet seq_set;
- private long nextid;
- private boolean idle;
+ // Process incoming logfiles
+ for (File f : infiles) {
+ if (logger.isDebugEnabled())
+ logger.debug("PROV8001 Starting " + f + " ...");
+ long time = System.currentTimeMillis();
+ int[] n = process(f);
+ time = System.currentTimeMillis() - time;
+ logger.info(String
+ .format("PROV8000 Processed %s in %d ms; %d of %d records.",
+ f.toString(), time, n[0], n[1]));
+ f.delete();
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("PROV0020: Caught exception in LogfileLoader: " + e);
+ }
+ }
+ }
- private LogfileLoader() {
- this.logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");
- this.db = new DB();
- this.spooldir = db.getProperties().getProperty("com.att.research.datarouter.provserver.spooldir");
- this.set_start = getIdRange();
- this.set_end = set_start + SET_SIZE - 1;
- this.seq_set = new RLEBitSet();
- this.nextid = 0;
- this.idle = false;
+ private boolean pruneRecords() {
+ boolean did1 = false;
+ long count = countRecords();
+ long threshold = DEFAULT_LOG_RETENTION;
+ Parameters param = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);
+ if (param != null) {
+ try {
+ long n = Long.parseLong(param.getValue());
+ // This check is to prevent inadvertent errors from wiping the table out
+ if (n > 1000000L)
+ threshold = n;
+ } catch (NumberFormatException e) {
+ // ignore
+ }
+ }
+ logger.debug("Pruning LOG_RECORD table: records in DB=" + count + ", threshold=" + threshold);
+ if (count > threshold) {
+ count -= threshold; // we need to remove this many records;
+ Map hist = getHistogram(); // histogram of records per day
+ // Determine the cutoff point to remove the needed number of records
+ long sum = 0;
+ long cutoff = 0;
+ for (Long day : new TreeSet(hist.keySet())) {
+ sum += hist.get(day);
+ cutoff = day;
+ if (sum >= count)
+ break;
+ }
+ cutoff++;
+ cutoff *= 86400000L; // convert day to ms
+ logger.debug(" Pruning records older than=" + (cutoff / 86400000L) + " (" + new Date(cutoff) + ")");
- // This is a potentially lengthy operation, so has been moved to run()
- //initializeNextid();
- this.setDaemon(true);
- this.setName("LogfileLoader");
- }
+ Connection conn = null;
+ try {
+ // Limit to a million at a time to avoid typing up the DB for too long.
+ conn = db.getConnection();
+ try(PreparedStatement ps = conn.prepareStatement("DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000")) {
+ ps.setLong(1, cutoff);
+ while (count > 0) {
+ if (!ps.execute()) {
+ int dcount = ps.getUpdateCount();
+ count -= dcount;
+ logger.debug(" " + dcount + " rows deleted.");
+ did1 |= (dcount != 0);
+ if (dcount == 0)
+ count = 0; // prevent inf. loops
+ } else {
+ count = 0; // shouldn't happen!
+ }
+ }
+ }
+ try(Statement stmt = conn.createStatement()) {
+ stmt.execute("OPTIMIZE TABLE LOG_RECORDS");
+ }
+ } catch (SQLException e) {
+ System.err.println(e);
+ logger.error(e);
+ } finally {
+ db.release(conn);
+ }
+ }
+ return did1;
+ }
- private long getIdRange() {
- long n;
- if (BaseServlet.isInitialActivePOD())
- n = 0;
- else if (BaseServlet.isInitialStandbyPOD())
- n = SET_SIZE;
- else
- n = SET_SIZE * 2;
- String r = String.format("[%X .. %X]", n, n+SET_SIZE-1);
- logger.debug("This server shall assign RECORD_IDs in the range "+r);
- return n;
- }
- /**
- * Return the bit set representing the record ID's that are loaded in this database.
- * @return the bit set
- */
- public RLEBitSet getBitSet() {
- return seq_set;
- }
- /**
- * True if the LogfileLoader is currently waiting for work.
- * @return true if idle
- */
- public boolean isIdle() {
- return idle;
- }
- /**
- * Run continuously to look for new logfiles in the spool directory and import them into the DB.
- * The spool is checked once per second. If free space on the MySQL filesystem falls below
- * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS
- * table is compacted until free space rises above the threshold.
- */
- @Override
- public void run() {
- initializeNextid(); // moved from the constructor
- while (true) {
- try {
- File dirfile = new File(spooldir);
- while (true) {
- // process IN files
- File[] infiles = dirfile.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.startsWith("IN.");
- }
- });
+ private long countRecords() {
+ long count = 0;
+ Connection conn = null;
+ try {
+ conn = db.getConnection();
+ try(Statement stmt = conn.createStatement()) {
+ try(ResultSet rs = stmt.executeQuery("SELECT COUNT(*) as COUNT from LOG_RECORDS")) {
+ if (rs.next()) {
+ count = rs.getLong("COUNT");
+ }
+ }
+ }
+ } catch (SQLException e) {
+ System.err.println(e);
+ logger.error(e);
+ } finally {
+ db.release(conn);
+ }
+ return count;
+ }
- if (infiles.length == 0) {
- idle = true;
- try {
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- }
- idle = false;
- } else {
- // Remove old rows
- if (pruneRecords()) {
- // Removed at least some entries, recompute the bit map
- initializeNextid();
- }
+ private Map getHistogram() {
+ Map map = new HashMap();
+ Connection conn = null;
+ try {
+ logger.debug(" LOG_RECORD table histogram...");
+ conn = db.getConnection();
+ try(Statement stmt = conn.createStatement()) {
+ try(ResultSet rs = stmt.executeQuery("SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY")) {
+ while (rs.next()) {
+ long day = rs.getLong("DAY");
+ long cnt = rs.getLong("COUNT");
+ map.put(day, cnt);
+ logger.debug(" " + day + " " + cnt);
+ }
+ }
+ }
+ } catch (SQLException e) {
+ System.err.println(e);
+ logger.error(e);
+ } finally {
+ db.release(conn);
+ }
+ return map;
+ }
- // Process incoming logfiles
- for (File f : infiles) {
- if (logger.isDebugEnabled())
- logger.debug("PROV8001 Starting " + f + " ...");
- long time = System.currentTimeMillis();
- int[] n = process(f);
- time = System.currentTimeMillis() - time;
- logger.info(String
- .format("PROV8000 Processed %s in %d ms; %d of %d records.",
- f.toString(), time, n[0], n[1]));
- f.delete();
- }
- }
- }
- } catch (Exception e) {
- logger.warn("PROV0020: Caught exception in LogfileLoader: " + e);
- e.printStackTrace();
- }
- }
- }
- private boolean pruneRecords() {
- boolean did1 = false;
- long count = countRecords();
- long threshold = DEFAULT_LOG_RETENTION;
- Parameters param = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);
- if (param != null) {
- try {
- long n = Long.parseLong(param.getValue());
- // This check is to prevent inadvertent errors from wiping the table out
- if (n > 1000000L)
- threshold = n;
- } catch (NumberFormatException e) {
- // ignore
- }
- }
- logger.debug("Pruning LOG_RECORD table: records in DB="+count+", threshold="+threshold);
- if (count > threshold) {
- count -= threshold; // we need to remove this many records;
- Map hist = getHistogram(); // histogram of records per day
- // Determine the cutoff point to remove the needed number of records
- long sum = 0;
- long cutoff = 0;
- for (Long day : new TreeSet(hist.keySet())) {
- sum += hist.get(day);
- cutoff = day;
- if (sum >= count)
- break;
- }
- cutoff++;
- cutoff *= 86400000L; // convert day to ms
- logger.debug(" Pruning records older than="+(cutoff/86400000L)+" ("+new Date(cutoff)+")");
+ private void initializeNextid() {
+ Connection conn = null;
+ try {
+ conn = db.getConnection();
+ RLEBitSet nbs = new RLEBitSet();
+ try(Statement stmt = conn.createStatement()) {
+ // Build a bitset of all records in the LOG_RECORDS table
+ // We need to run this SELECT in stages, because otherwise we run out of memory!
+ final long stepsize = 6000000L;
+ boolean go_again = true;
+ for (long i = 0; go_again; i += stepsize) {
+ String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize);
+ try (ResultSet rs = stmt.executeQuery(sql)) {
+ go_again = false;
+ while (rs.next()) {
+ long n = rs.getLong("RECORD_ID");
+ nbs.set(n);
+ go_again = true;
+ }
+ }
+ }
+ }
+ seq_set = nbs;
+ // Compare with the range for this server
+ // Determine the next ID for this set of record IDs
+ RLEBitSet tbs = (RLEBitSet) nbs.clone();
+ RLEBitSet idset = new RLEBitSet();
+ idset.set(set_start, set_start + SET_SIZE);
+ tbs.and(idset);
+ long t = tbs.length();
+ nextid = (t == 0) ? set_start : (t - 1);
+ if (nextid >= set_start + SET_SIZE) {
+ // Handle wraparound, when the IDs reach the end of our "range"
+ Long[] last = null;
+ Iterator li = tbs.getRangeIterator();
+ while (li.hasNext()) {
+ last = li.next();
+ }
+ if (last != null) {
+ tbs.clear(last[0], last[1] + 1);
+ t = tbs.length();
+ nextid = (t == 0) ? set_start : (t - 1);
+ }
+ }
+ logger.debug(String.format("initializeNextid, next ID is %d (%x)", nextid, nextid));
+ } catch (SQLException e) {
+ System.err.println(e);
+ logger.error(e);
+ } finally {
+ db.release(conn);
+ }
+ }
- Connection conn = null;
- try {
- // Limit to a million at a time to avoid typing up the DB for too long.
- conn = db.getConnection();
- PreparedStatement ps = conn.prepareStatement("DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000");
- ps.setLong(1, cutoff);
- while (count > 0) {
- if (!ps.execute()) {
- int dcount = ps.getUpdateCount();
- count -= dcount;
- logger.debug(" "+dcount+" rows deleted.");
- did1 |= (dcount!=0);
- if (dcount == 0)
- count = 0; // prevent inf. loops
- } else {
- count = 0; // shouldn't happen!
- }
- }
- ps.close();
- Statement stmt = conn.createStatement();
- stmt.execute("OPTIMIZE TABLE LOG_RECORDS");
- stmt.close();
- } catch (SQLException e) {
- System.err.println(e);
- e.printStackTrace();
- } finally {
- db.release(conn);
- }
- }
- return did1;
- }
- private long countRecords() {
- long count = 0;
- Connection conn = null;
- try {
- conn = db.getConnection();
- Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery("SELECT COUNT(*) as COUNT from LOG_RECORDS");
- if (rs.next()) {
- count = rs.getLong("COUNT");
- }
- rs.close();
- stmt.close();
- } catch (SQLException e) {
- System.err.println(e);
- e.printStackTrace();
- } finally {
- db.release(conn);
- }
- return count;
- }
- private Map getHistogram() {
- Map map = new HashMap();
- Connection conn = null;
- try {
- logger.debug(" LOG_RECORD table histogram...");
- conn = db.getConnection();
- Statement stmt = conn.createStatement();
- ResultSet rs = stmt.executeQuery("SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY");
- while (rs.next()) {
- long day = rs.getLong("DAY");
- long cnt = rs.getLong("COUNT");
- map.put(day, cnt);
- logger.debug(" "+day + " "+cnt);
- }
- rs.close();
- stmt.close();
- } catch (SQLException e) {
- System.err.println(e);
- e.printStackTrace();
- } finally {
- db.release(conn);
- }
- return map;
- }
- private void initializeNextid() {
- Connection conn = null;
- try {
- conn = db.getConnection();
- Statement stmt = conn.createStatement();
- // Build a bitset of all records in the LOG_RECORDS table
- // We need to run this SELECT in stages, because otherwise we run out of memory!
- RLEBitSet nbs = new RLEBitSet();
- final long stepsize = 6000000L;
- boolean go_again = true;
- for (long i = 0; go_again; i += stepsize) {
- String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize);
- ResultSet rs = stmt.executeQuery(sql);
- go_again = false;
- while (rs.next()) {
- long n = rs.getLong("RECORD_ID");
- nbs.set(n);
- go_again = true;
- }
- rs.close();
- }
- stmt.close();
- seq_set = nbs;
+ @SuppressWarnings("resource")
+ private int[] process(File f) {
+ int ok = 0, total = 0;
+ try {
+ Connection conn = db.getConnection();
+ PreparedStatement ps = conn.prepareStatement(INSERT_SQL);
+ Reader r = f.getPath().endsWith(".gz")
+ ? new InputStreamReader(new GZIPInputStream(new FileInputStream(f)))
+ : new FileReader(f);
+ try(LineNumberReader in = new LineNumberReader(r)) {
+ String line;
+ while ((line = in.readLine()) != null) {
+ try {
+ for (Loadable rec : buildRecords(line)) {
+ rec.load(ps);
+ if (rec instanceof LogRecord) {
+ LogRecord lr = ((LogRecord) rec);
+ if (!seq_set.get(lr.getRecordId())) {
+ ps.executeUpdate();
+ seq_set.set(lr.getRecordId());
+ } else
+ logger.debug("Duplicate record ignored: " + lr.getRecordId());
+ } else {
+ if (++nextid > set_end)
+ nextid = set_start;
+ ps.setLong(18, nextid);
+ ps.executeUpdate();
+ seq_set.set(nextid);
+ }
+ ps.clearParameters();
+ ok++;
+ }
+ } catch (SQLException e) {
+ logger.warn("PROV8003 Invalid value in record: " + line);
+ logger.debug(e);
+ } catch (NumberFormatException e) {
+ logger.warn("PROV8004 Invalid number in record: " + line);
+ logger.debug(e);
+ } catch (ParseException e) {
+ logger.warn("PROV8005 Invalid date in record: " + line);
+ logger.debug(e);
+ } catch (Exception e) {
+ logger.warn("PROV8006 Invalid pattern in record: " + line);
+ logger.debug(e);
+ }
+ total++;
+ }
+ }
+ ps.close();
+ db.release(conn);
+ conn = null;
+ } catch (FileNotFoundException e) {
+ logger.warn("PROV8007 Exception reading " + f + ": " + e);
+ } catch (IOException e) {
+ logger.warn("PROV8007 Exception reading " + f + ": " + e);
+ } catch (SQLException e) {
+ logger.warn("PROV8007 Exception reading " + f + ": " + e);
+ }
+ return new int[]{ok, total};
+ }
- // Compare with the range for this server
- // Determine the next ID for this set of record IDs
- RLEBitSet tbs = (RLEBitSet) nbs.clone();
- RLEBitSet idset = new RLEBitSet();
- idset.set(set_start, set_start+SET_SIZE);
- tbs.and(idset);
- long t = tbs.length();
- nextid = (t == 0) ? set_start : (t - 1);
- if (nextid >= set_start+SET_SIZE) {
- // Handle wraparound, when the IDs reach the end of our "range"
- Long[] last = null;
- Iterator li = tbs.getRangeIterator();
- while (li.hasNext()) {
- last = li.next();
- }
- if (last != null) {
- tbs.clear(last[0], last[1]+1);
- t = tbs.length();
- nextid = (t == 0) ? set_start : (t - 1);
- }
- }
- logger.debug(String.format("initializeNextid, next ID is %d (%x)", nextid, nextid));
- } catch (SQLException e) {
- System.err.println(e);
- e.printStackTrace();
- } finally {
- db.release(conn);
- }
- }
-// OLD CODE - commented here for historical purposes
-//
-// private boolean pruneRecordsOldAlgorithm() {
-// // Determine space available -- available space must be at least 20% under /opt/app/mysql
-// int pct = getFreePercentage();
-// boolean did1 = false;
-// while (pct < REQUIRED_FREE_PCT) {
-// logger.info("PROV8008: Free space is " + pct + "% - removing old log entries");
-// boolean didit = removeOldestEntries();
-// pct = didit ? getFreePercentage() : 100; // don't loop endlessly
-// did1 |= didit;
-// }
-// return did1;
-// }
-// private int getFreePercentage() {
-// FileSystem fs = (Paths.get("/opt/app/mysql")).getFileSystem();
-// long total = 0;
-// long avail = 0;
-// try {
-// for (FileStore store : fs.getFileStores()) {
-// total += store.getTotalSpace();
-// avail += store.getUsableSpace();
-// }
-// } catch (IOException e) {
-// }
-// try { fs.close(); } catch (Exception e) { }
-// return (int)((avail * 100) / total);
-// }
-// private boolean removeOldestEntries() {
-// // Remove the last days worth of entries
-// Connection conn = null;
-// try {
-// conn = db.getConnection();
-// Statement stmt = conn.createStatement();
-// ResultSet rs = stmt.executeQuery("select min(event_time) as MIN from LOG_RECORDS");
-// if (rs != null) {
-// if (rs.next()) {
-// // Compute the end of the first day of logs
-// long first = rs.getLong("MIN");
-// Calendar cal = new GregorianCalendar();
-// cal.setTime(new Date(first));
-// cal.add(Calendar.DAY_OF_YEAR, 1);
-// cal.set(Calendar.HOUR_OF_DAY, 0);
-// cal.set(Calendar.MINUTE, 0);
-// cal.set(Calendar.SECOND, 0);
-// cal.set(Calendar.MILLISECOND, 0);
-// if (!stmt.execute("delete from LOG_RECORDS where event_time < " + cal.getTimeInMillis())) {
-// int count = stmt.getUpdateCount();
-// logger.info("PROV0009: Removed "+count+" old log entries.");
-// stmt.execute("OPTIMIZE TABLE LOG_RECORDS");
-// }
-// rs.close();
-// stmt.close();
-// return true;
-// }
-// rs.close();
-// }
-// stmt.close();
-// } catch (SQLException e) {
-// System.err.println(e);
-// e.printStackTrace();
-// } finally {
-// db.release(conn);
-// }
-// return false;
-// }
- @SuppressWarnings("resource")
- private int[] process(File f) {
- int ok = 0, total = 0;
- try {
- Connection conn = db.getConnection();
- PreparedStatement ps = conn.prepareStatement(INSERT_SQL);
- Reader r = f.getPath().endsWith(".gz")
- ? new InputStreamReader(new GZIPInputStream(new FileInputStream(f)))
- : new FileReader(f);
- LineNumberReader in = new LineNumberReader(r);
- String line;
- while ((line = in.readLine()) != null) {
- try {
- for (Loadable rec : buildRecords(line)) {
- rec.load(ps);
- if (rec instanceof LogRecord) {
- LogRecord lr = ((LogRecord)rec);
- if (!seq_set.get(lr.getRecordId())) {
- ps.executeUpdate();
- seq_set.set(lr.getRecordId());
- } else
- logger.debug("Duplicate record ignored: "+lr.getRecordId());
- } else {
- if (++nextid > set_end)
- nextid = set_start;
- ps.setLong(18, nextid);
- ps.executeUpdate();
- seq_set.set(nextid);
- }
- ps.clearParameters();
- ok++;
- }
- } catch (SQLException e) {
- logger.warn("PROV8003 Invalid value in record: "+line);
- logger.debug(e);
- e.printStackTrace();
- } catch (NumberFormatException e) {
- logger.warn("PROV8004 Invalid number in record: "+line);
- logger.debug(e);
- e.printStackTrace();
- } catch (ParseException e) {
- logger.warn("PROV8005 Invalid date in record: "+line);
- logger.debug(e);
- e.printStackTrace();
- } catch (Exception e) {
- logger.warn("PROV8006 Invalid pattern in record: "+line);
- logger.debug(e);
- e.printStackTrace();
- }
- total++;
- }
- in.close();
- ps.close();
- db.release(conn);
- conn = null;
- } catch (FileNotFoundException e) {
- logger.warn("PROV8007 Exception reading "+f+": "+e);
- } catch (IOException e) {
- logger.warn("PROV8007 Exception reading "+f+": "+e);
- } catch (SQLException e) {
- logger.warn("PROV8007 Exception reading "+f+": "+e);
- }
- return new int[] { ok, total };
- }
- private Loadable[] buildRecords(String line) throws ParseException {
- String[] pp = line.split("\\|");
- if (pp != null && pp.length >= 7) {
- String rtype = pp[1].toUpperCase();
- if (rtype.equals("PUB") && pp.length == 11) {
- // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status
- return new Loadable[] { new PublishRecord(pp) };
- }
- if (rtype.equals("DEL") && pp.length == 12) {
- // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid
- String[] subs = pp[4].split("\\s+");
- if (subs != null) {
- Loadable[] rv = new Loadable[subs.length];
- for (int i = 0; i < subs.length; i++) {
- // create a new record for each individual sub
- pp[4] = subs[i];
- rv[i] = new DeliveryRecord(pp);
- }
- return rv;
- }
- }
- if (rtype.equals("EXP") && pp.length == 11) {
- // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts
- ExpiryRecord e = new ExpiryRecord(pp);
- if (e.getReason().equals("other"))
- logger.info("Invalid reason '"+pp[9]+"' changed to 'other' for record: "+e.getPublishId());
- return new Loadable[] { e };
- }
- if (rtype.equals("PBF") && pp.length == 12) {
- // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error
- return new Loadable[] { new PubFailRecord(pp) };
- }
- if (rtype.equals("DLX") && pp.length == 7) {
- // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent
- return new Loadable[] { new DeliveryExtraRecord(pp) };
- }
- if (rtype.equals("LOG") && (pp.length == 19 || pp.length == 20)) {
- // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id
- return new Loadable[] { new LogRecord(pp) };
- }
- }
- logger.warn("PROV8002 bad record: "+line);
- return new Loadable[0];
- }
+ private Loadable[] buildRecords(String line) throws ParseException {
+ String[] pp = line.split("\\|");
+ if (pp != null && pp.length >= 7) {
+ String rtype = pp[1].toUpperCase();
+ if (rtype.equals("PUB") && pp.length == 11) {
+ // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status
+ return new Loadable[]{new PublishRecord(pp)};
+ }
+ if (rtype.equals("DEL") && pp.length == 12) {
+ // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid
+ String[] subs = pp[4].split("\\s+");
+ if (subs != null) {
+ Loadable[] rv = new Loadable[subs.length];
+ for (int i = 0; i < subs.length; i++) {
+ // create a new record for each individual sub
+ pp[4] = subs[i];
+ rv[i] = new DeliveryRecord(pp);
+ }
+ return rv;
+ }
+ }
+ if (rtype.equals("EXP") && pp.length == 11) {
+ // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts
+ ExpiryRecord e = new ExpiryRecord(pp);
+ if (e.getReason().equals("other"))
+ logger.info("Invalid reason '" + pp[9] + "' changed to 'other' for record: " + e.getPublishId());
+ return new Loadable[]{e};
+ }
+ if (rtype.equals("PBF") && pp.length == 12) {
+ // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error
+ return new Loadable[]{new PubFailRecord(pp)};
+ }
+ if (rtype.equals("DLX") && pp.length == 7) {
+ // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent
+ return new Loadable[]{new DeliveryExtraRecord(pp)};
+ }
+ if (rtype.equals("LOG") && (pp.length == 19 || pp.length == 20)) {
+ // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id
+ return new Loadable[]{new LogRecord(pp)};
+ }
+ }
+ logger.warn("PROV8002 bad record: " + line);
+ return new Loadable[0];
+ }
- /**
- * The LogfileLoader can be run stand-alone by invoking the main() method of this class.
- * @param a ignored
- * @throws InterruptedException
- */
- public static void main(String[] a) throws InterruptedException {
- LogfileLoader.getLoader();
- Thread.sleep(200000L);
- }
+ /**
+ * The LogfileLoader can be run stand-alone by invoking the main() method of this class.
+ *
+ * @param a ignored
+ * @throws InterruptedException
+ */
+ public static void main(String[] a) throws InterruptedException {
+ LogfileLoader.getLoader();
+ Thread.sleep(200000L);
+ }
}