Refactor Prov DB handling
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / utils / LogfileLoader.java
index 45b8788..18ffea0 100644 (file)
-/*******************************************************************************\r
- * ============LICENSE_START==================================================\r
- * * org.onap.dmaap\r
- * * ===========================================================================\r
- * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- * * ===========================================================================\r
- * * Licensed under the Apache License, Version 2.0 (the "License");\r
- * * you may not use this file except in compliance with the License.\r
- * * You may obtain a copy of the License at\r
- * * \r
- *  *      http://www.apache.org/licenses/LICENSE-2.0\r
- * * \r
- *  * Unless required by applicable law or agreed to in writing, software\r
- * * distributed under the License is distributed on an "AS IS" BASIS,\r
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * * See the License for the specific language governing permissions and\r
- * * limitations under the License.\r
- * * ============LICENSE_END====================================================\r
- * *\r
- * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
- * *\r
- ******************************************************************************/\r
-\r
-\r
-package org.onap.dmaap.datarouter.provisioning.utils;\r
-\r
-import java.io.File;\r
-import java.io.FileInputStream;\r
-import java.io.FileNotFoundException;\r
-import java.io.FileReader;\r
-import java.io.FilenameFilter;\r
-import java.io.IOException;\r
-import java.io.InputStreamReader;\r
-import java.io.LineNumberReader;\r
-import java.io.Reader;\r
-import java.sql.Connection;\r
-import java.sql.PreparedStatement;\r
-import java.sql.ResultSet;\r
-import java.sql.SQLException;\r
-import java.sql.Statement;\r
-import java.text.ParseException;\r
-import java.util.Date;\r
-import java.util.HashMap;\r
-import java.util.Iterator;\r
-import java.util.Map;\r
-import java.util.TreeSet;\r
-import java.util.zip.GZIPInputStream;\r
-\r
-import org.apache.log4j.Logger;\r
-import org.onap.dmaap.datarouter.provisioning.BaseServlet;\r
-import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord;\r
-import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord;\r
-import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord;\r
-import org.onap.dmaap.datarouter.provisioning.beans.Loadable;\r
-import org.onap.dmaap.datarouter.provisioning.beans.LogRecord;\r
-import org.onap.dmaap.datarouter.provisioning.beans.Parameters;\r
-import org.onap.dmaap.datarouter.provisioning.beans.PubFailRecord;\r
-import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord;\r
-\r
-/**\r
- * This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir.\r
- * These logfiles are loaded into the MariaDB LOG_RECORDS table. In a running provisioning server, there should only be\r
- * two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at\r
- * startup to load the old (1.0) style log tables into LOG_RECORDS;\r
- * <p>This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the\r
- * database.\r
- * This bit set is used to synchronize between provisioning servers.</p>\r
- *\r
- * @author Robert Eby\r
- * @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $\r
- */\r
-public class LogfileLoader extends Thread {\r
-       /** Default number of log records to keep when pruning.  Keep 10M by default. */\r
-       public static final long DEFAULT_LOG_RETENTION = 10000000L;\r
-       /** NOT USED: Percentage of free space required before old records are removed. */\r
-       public static final int REQUIRED_FREE_PCT = 20;\r
-\r
-       /** This is a singleton -- there is only one LogfileLoader object in the server */\r
-       private static LogfileLoader p;\r
-\r
-       /**\r
-        * Get the singleton LogfileLoader object, and start it if it is not running.\r
-        * @return the LogfileLoader\r
-        */\r
-       public static synchronized LogfileLoader getLoader() {\r
-               if (p == null)\r
-                       p = new LogfileLoader();\r
-               if (!p.isAlive())\r
-                       p.start();\r
-               return p;\r
-       }\r
-\r
-       /** The PreparedStatement which is loaded by a <i>Loadable</i>. */\r
-       public static final String INSERT_SQL = "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";\r
-       /** Each server can assign this many IDs */\r
-       private static final long SET_SIZE = (1L << 56);\r
-\r
-       private final Logger logger;\r
-       private final DB db;\r
-       private final String spooldir;\r
-       private final long set_start;\r
-       private final long set_end;\r
-       private RLEBitSet seq_set;\r
-       private long nextid;\r
-       private boolean idle;\r
-\r
-       private LogfileLoader() {\r
-               this.logger   = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");\r
-               this.db       = new DB();\r
-               this.spooldir = db.getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");\r
-               this.set_start = getIdRange();\r
-               this.set_end   = set_start + SET_SIZE - 1;\r
-               this.seq_set  = new RLEBitSet();\r
-               this.nextid   = 0;\r
-               this.idle     = false;\r
-\r
-               // This is a potentially lengthy operation, so has been moved to run()\r
-               //initializeNextid();\r
-               this.setDaemon(true);\r
-               this.setName("LogfileLoader");\r
-       }\r
-\r
-       private long getIdRange() {\r
-               long n;\r
-               if (BaseServlet.isInitialActivePOD())\r
-                       n = 0;\r
-               else if (BaseServlet.isInitialStandbyPOD())\r
-                       n = SET_SIZE;\r
-               else\r
-                       n = SET_SIZE * 2;\r
-               String r = String.format("[%X .. %X]", n, n+SET_SIZE-1);\r
-               logger.debug("This server shall assign RECORD_IDs in the range "+r);\r
-               return n;\r
-       }\r
-       /**\r
-        * Return the bit set representing the record ID's that are loaded in this database.\r
-        * @return the bit set\r
-        */\r
-       public RLEBitSet getBitSet() {\r
-               return seq_set;\r
-       }\r
-       /**\r
-        * True if the LogfileLoader is currently waiting for work.\r
-        * @return true if idle\r
-        */\r
-       public boolean isIdle() {\r
-               return idle;\r
-       }\r
-       /**\r
-        * Run continuously to look for new logfiles in the spool directory and import them into the DB.\r
-        * The spool is checked once per second.  If free space on the MariaDB filesystem falls below\r
-        * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS\r
-        * table is compacted until free space rises above the threshold.\r
-        */\r
-       @Override\r
-       public void run() {\r
-               initializeNextid();     // moved from the constructor\r
-               while (true) {\r
-                       try {\r
-                               File dirfile = new File(spooldir);\r
-                               while (true) {\r
-                                       // process IN files\r
-                                       File[] infiles = dirfile.listFiles(new FilenameFilter() {\r
-                                               @Override\r
-                                               public boolean accept(File dir, String name) {\r
-                                                       return name.startsWith("IN.");\r
-                                               }\r
-                                       });\r
-\r
-                                       if (infiles.length == 0) {\r
-                                               idle = true;\r
-                                               try {\r
-                                                       Thread.sleep(1000L);\r
-                                               } catch (InterruptedException e) {\r
-                                               }\r
-                                               idle = false;\r
-                                       } else {\r
-                                               // Remove old rows\r
-                                               if (pruneRecords()) {\r
-                                                       // Removed at least some entries, recompute the bit map\r
-                                                       initializeNextid();\r
-                                               }\r
-\r
-                                               // Process incoming logfiles\r
-                                               for (File f : infiles) {\r
-                                                       if (logger.isDebugEnabled())\r
-                                                               logger.debug("PROV8001 Starting " + f + " ...");\r
-                                                       long time = System.currentTimeMillis();\r
-                                                       int[] n = process(f);\r
-                                                       time = System.currentTimeMillis() - time;\r
-                                                       logger.info(String\r
-                                                                       .format("PROV8000 Processed %s in %d ms; %d of %d records.",\r
-                                                                                       f.toString(), time, n[0], n[1]));\r
-                                                       f.delete();\r
-                                               }\r
-                                       }\r
-                               }\r
-                       } catch (Exception e) {\r
-                               logger.warn("PROV0020: Caught exception in LogfileLoader: " + e);\r
-                               e.printStackTrace();\r
-                       }\r
-               }\r
-       }\r
-       private boolean pruneRecords() {\r
-               boolean did1 = false;\r
-               long count = countRecords();\r
-               long threshold = DEFAULT_LOG_RETENTION;\r
-               Parameters param = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);\r
-               if (param != null) {\r
-                       try {\r
-                               long n = Long.parseLong(param.getValue());\r
-                               // This check is to prevent inadvertent errors from wiping the table out\r
-                               if (n > 1000000L)\r
-                                       threshold = n;\r
-                       } catch (NumberFormatException e) {\r
-                               // ignore\r
-                       }\r
-               }\r
-               logger.debug("Pruning LOG_RECORD table: records in DB="+count+", threshold="+threshold);\r
-               if (count > threshold) {\r
-                       count -= threshold;                                             // we need to remove this many records;\r
-                       Map<Long,Long> hist = getHistogram();   // histogram of records per day\r
-                       // Determine the cutoff point to remove the needed number of records\r
-                       long sum = 0;\r
-                       long cutoff = 0;\r
-                       for (Long day : new TreeSet<Long>(hist.keySet())) {\r
-                               sum += hist.get(day);\r
-                               cutoff = day;\r
-                               if (sum >= count)\r
-                                       break;\r
-                       }\r
-                       cutoff++;\r
-                       cutoff *= 86400000L;            // convert day to ms\r
-                       logger.debug("  Pruning records older than="+(cutoff/86400000L)+" ("+new Date(cutoff)+")");\r
-\r
-                       Connection conn = null;\r
-                       try {\r
-                               // Limit to a million at a time to avoid typing up the DB for too long.\r
-                               conn = db.getConnection();\r
-                               PreparedStatement ps = conn.prepareStatement("DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000");\r
-                               ps.setLong(1, cutoff);\r
-                               while (count > 0) {\r
-                                       if (!ps.execute()) {\r
-                                               int dcount = ps.getUpdateCount();\r
-                                               count -= dcount;\r
-                                               logger.debug("  "+dcount+" rows deleted.");\r
-                                               did1 |= (dcount!=0);\r
-                                               if (dcount == 0)\r
-                                                       count = 0;      // prevent inf. loops\r
-                                       } else {\r
-                                               count = 0;      // shouldn't happen!\r
-                                       }\r
-                               }\r
-                               ps.close();\r
-                               Statement stmt = conn.createStatement();\r
-                               stmt.execute("OPTIMIZE TABLE LOG_RECORDS");\r
-                               stmt.close();\r
-                       } catch (SQLException e) {\r
-                               System.err.println(e);\r
-                               e.printStackTrace();\r
-                       } finally {\r
-                               db.release(conn);\r
-                       }\r
-               }\r
-               return did1;\r
-       }\r
-       private long countRecords() {\r
-               long count = 0;\r
-               Connection conn = null;\r
-               try {\r
-                       conn = db.getConnection();\r
-                       Statement stmt = conn.createStatement();\r
-                       ResultSet rs = stmt.executeQuery("SELECT COUNT(*) as COUNT from LOG_RECORDS");\r
-                       if (rs.next()) {\r
-                               count = rs.getLong("COUNT");\r
-                       }\r
-                       rs.close();\r
-                       stmt.close();\r
-               } catch (SQLException e) {\r
-                       System.err.println(e);\r
-                       e.printStackTrace();\r
-               } finally {\r
-                       db.release(conn);\r
-               }\r
-               return count;\r
-       }\r
-       private Map<Long,Long> getHistogram() {\r
-               Map<Long,Long> map = new HashMap<Long,Long>();\r
-               Connection conn = null;\r
-               try {\r
-                       logger.debug("  LOG_RECORD table histogram...");\r
-                       conn = db.getConnection();\r
-                       Statement stmt = conn.createStatement();\r
-                       ResultSet rs = stmt.executeQuery("SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY");\r
-                       while (rs.next()) {\r
-                               long day = rs.getLong("DAY");\r
-                               long cnt = rs.getLong("COUNT");\r
-                               map.put(day, cnt);\r
-                               logger.debug("  "+day + "  "+cnt);\r
-                       }\r
-                       rs.close();\r
-                       stmt.close();\r
-               } catch (SQLException e) {\r
-                       System.err.println(e);\r
-                       e.printStackTrace();\r
-               } finally {\r
-                       db.release(conn);\r
-               }\r
-               return map;\r
-       }\r
-       private void initializeNextid() {\r
-               Connection conn = null;\r
-               try {\r
-                       conn = db.getConnection();\r
-                       Statement stmt = conn.createStatement();\r
-                       // Build a bitset of all records in the LOG_RECORDS table\r
-                       // We need to run this SELECT in stages, because otherwise we run out of memory!\r
-                       RLEBitSet nbs = new RLEBitSet();\r
-                       final long stepsize = 6000000L;\r
-                       boolean go_again = true;\r
-                       for (long i = 0; go_again; i += stepsize) {\r
-                               String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize);\r
-                               ResultSet rs = stmt.executeQuery(sql);\r
-                               go_again = false;\r
-                               while (rs.next()) {\r
-                                       long n = rs.getLong("RECORD_ID");\r
-                                       nbs.set(n);\r
-                                       go_again = true;\r
-                               }\r
-                               rs.close();\r
-                       }\r
-                       stmt.close();\r
-                       seq_set = nbs;\r
-\r
-                       // Compare with the range for this server\r
-                       // Determine the next ID for this set of record IDs\r
-                       RLEBitSet tbs = (RLEBitSet) nbs.clone();\r
-                       RLEBitSet idset = new RLEBitSet();\r
-                       idset.set(set_start, set_start+SET_SIZE);\r
-                       tbs.and(idset);\r
-                       long t = tbs.length();\r
-                       nextid = (t == 0) ? set_start : (t - 1);\r
-                       if (nextid >= set_start+SET_SIZE) {\r
-                               // Handle wraparound, when the IDs reach the end of our "range"\r
-                               Long[] last = null;\r
-                               Iterator<Long[]> li = tbs.getRangeIterator();\r
-                               while (li.hasNext()) {\r
-                                       last = li.next();\r
-                               }\r
-                               if (last != null) {\r
-                                       tbs.clear(last[0], last[1]+1);\r
-                                       t = tbs.length();\r
-                                       nextid = (t == 0) ? set_start : (t - 1);\r
-                               }\r
-                       }\r
-                       logger.debug(String.format("initializeNextid, next ID is %d (%x)", nextid, nextid));\r
-               } catch (SQLException e) {\r
-                       System.err.println(e);\r
-                       e.printStackTrace();\r
-               } finally {\r
-                       db.release(conn);\r
-               }\r
-       }\r
-\r
-       @SuppressWarnings("resource")\r
-       private int[] process(File f) {\r
-               int ok = 0, total = 0;\r
-               try {\r
-                       Connection conn = db.getConnection();\r
-                       PreparedStatement ps = conn.prepareStatement(INSERT_SQL);\r
-                       Reader r = f.getPath().endsWith(".gz")\r
-                               ? new InputStreamReader(new GZIPInputStream(new FileInputStream(f)))\r
-                               : new FileReader(f);\r
-                       LineNumberReader in = new LineNumberReader(r);\r
-                       String line;\r
-                       while ((line = in.readLine()) != null) {\r
-                               try {\r
-                                       for (Loadable rec : buildRecords(line)) {\r
-                                               rec.load(ps);\r
-                                               if (rec instanceof LogRecord) {\r
-                                                       LogRecord lr = ((LogRecord)rec);\r
-                                                       if (!seq_set.get(lr.getRecordId())) {\r
-                                                               ps.executeUpdate();\r
-                                                               seq_set.set(lr.getRecordId());\r
-                                                       } else\r
-                                                               logger.debug("Duplicate record ignored: "+lr.getRecordId());\r
-                                               } else {\r
-                                                       if (++nextid > set_end)\r
-                                                               nextid = set_start;\r
-                                                       ps.setLong(18, nextid);\r
-                                                       ps.executeUpdate();\r
-                                                       seq_set.set(nextid);\r
-                                               }\r
-                                               ps.clearParameters();\r
-                                               ok++;\r
-                                       }\r
-                               } catch (SQLException e) {\r
-                                       logger.warn("PROV8003 Invalid value in record: "+line);\r
-                                       logger.debug(e);\r
-                                       e.printStackTrace();\r
-                               } catch (NumberFormatException e) {\r
-                                       logger.warn("PROV8004 Invalid number in record: "+line);\r
-                                       logger.debug(e);\r
-                                       e.printStackTrace();\r
-                               } catch (ParseException e) {\r
-                                       logger.warn("PROV8005 Invalid date in record: "+line);\r
-                                       logger.debug(e);\r
-                                       e.printStackTrace();\r
-                               } catch (Exception e) {\r
-                                       logger.warn("PROV8006 Invalid pattern in record: "+line);\r
-                                       logger.debug(e);\r
-                                       e.printStackTrace();\r
-                               }\r
-                               total++;\r
-                       }\r
-                       in.close();\r
-                       ps.close();\r
-                       db.release(conn);\r
-                       conn = null;\r
-               } catch (FileNotFoundException e) {\r
-                       logger.warn("PROV8007 Exception reading "+f+": "+e);\r
-               } catch (IOException e) {\r
-                       logger.warn("PROV8007 Exception reading "+f+": "+e);\r
-               } catch (SQLException e) {\r
-                       logger.warn("PROV8007 Exception reading "+f+": "+e);\r
-               }\r
-               return new int[] { ok, total };\r
-       }\r
-       private Loadable[] buildRecords(String line) throws ParseException {\r
-               String[] pp = line.split("\\|");\r
-               if (pp != null && pp.length >= 7) {\r
-                       String rtype = pp[1].toUpperCase();\r
-                       if (rtype.equals("PUB") && pp.length == 11) {\r
-                               // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status\r
-                               return new Loadable[] { new PublishRecord(pp) };\r
-                       }\r
-                       if (rtype.equals("DEL") && pp.length == 12) {\r
-                               // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid\r
-                               String[] subs = pp[4].split("\\s+");\r
-                               if (subs != null) {\r
-                                       Loadable[] rv = new Loadable[subs.length];\r
-                                       for (int i = 0; i < subs.length; i++) {\r
-                                               // create a new record for each individual sub\r
-                                               pp[4] = subs[i];\r
-                                               rv[i] = new DeliveryRecord(pp);\r
-                                       }\r
-                                       return rv;\r
-                               }\r
-                       }\r
-                       if (rtype.equals("EXP") && pp.length == 11) {\r
-                               // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts\r
-                               ExpiryRecord e = new ExpiryRecord(pp);\r
-                               if (e.getReason().equals("other"))\r
-                                       logger.info("Invalid reason '"+pp[9]+"' changed to 'other' for record: "+e.getPublishId());\r
-                               return new Loadable[] { e };\r
-                       }\r
-                       if (rtype.equals("PBF") && pp.length == 12) {\r
-                               // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error\r
-                               return new Loadable[] { new PubFailRecord(pp) };\r
-                       }\r
-                       if (rtype.equals("DLX") && pp.length == 7) {\r
-                               // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent\r
-                               return new Loadable[] { new DeliveryExtraRecord(pp) };\r
-                       }\r
-                       if (rtype.equals("LOG") && (pp.length == 19 || pp.length == 20)) {\r
-                               // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id\r
-                               return new Loadable[] { new LogRecord(pp) };\r
-                       }\r
-               }\r
-               logger.warn("PROV8002 bad record: "+line);\r
-               return new Loadable[0];\r
-       }\r
-\r
-       /**\r
-        * The LogfileLoader can be run stand-alone by invoking the main() method of this class.\r
-        * @param a ignored\r
-        * @throws InterruptedException\r
-        */\r
-       public static void main(String[] a) throws InterruptedException {\r
-               LogfileLoader.getLoader();\r
-               Thread.sleep(200000L);\r
-       }\r
-}\r
+/*******************************************************************************
+ * ============LICENSE_START==================================================
+ * * org.onap.dmaap
+ * * ===========================================================================
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * * ===========================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ *  *      http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ *  * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * * ============LICENSE_END====================================================
+ * *
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * *
+ ******************************************************************************/
+
+
+package org.onap.dmaap.datarouter.provisioning.utils;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.io.Reader;
+import java.nio.file.Files;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.zip.GZIPInputStream;
+import org.onap.dmaap.datarouter.provisioning.BaseServlet;
+import org.onap.dmaap.datarouter.provisioning.ProvRunner;
+import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord;
+import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord;
+import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord;
+import org.onap.dmaap.datarouter.provisioning.beans.Loadable;
+import org.onap.dmaap.datarouter.provisioning.beans.LogRecord;
+import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
+import org.onap.dmaap.datarouter.provisioning.beans.PubFailRecord;
+import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord;
+
+/**
+ * This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir.
+ * These logfiles are loaded into the MariaDB LOG_RECORDS table. In a running provisioning server, there should only be
+ * two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at
+ * startup to load the old (1.0) style log tables into LOG_RECORDS;
+ * <p>This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the
+ * database.
+ * This bit set is used to synchronize between provisioning servers.</p>
+ *
+ * @author Robert Eby
+ * @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $
+ */
+public class LogfileLoader extends Thread {
+    /**
+     * NOT USED: Percentage of free space required before old records are removed.
+     */
+    public static final int REQUIRED_FREE_PCT = 20;
+
+    /**
+     * This is a singleton -- there is only one LogfileLoader object in the server.
+     */
+    private static LogfileLoader logfileLoader;
+
+    /**
+     * Each server can assign this many IDs.
+     */
+    private static final long SET_SIZE = (1L << 56);
+
+    private final EELFLogger logger;
+    private final String spooldir;
+    private final long setStart;
+    private final long setEnd;
+    private RLEBitSet seqSet;
+    private long nextId;
+    private boolean idle;
+
+    private LogfileLoader() {
+        this.logger = EELFManager.getInstance().getLogger("InternalLog");
+        this.spooldir = ProvRunner.getProvProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
+        this.setStart = getIdRange();
+        this.setEnd = setStart + SET_SIZE - 1;
+        this.seqSet = new RLEBitSet();
+        this.nextId = 0;
+        this.idle = false;
+        this.setDaemon(true);
+        this.setName("LogfileLoader");
+    }
+
+    /**
+     * Get the singleton LogfileLoader object, and start it if it is not running.
+     *
+     * @return the LogfileLoader
+     */
+    public static synchronized LogfileLoader getLoader() {
+        if (logfileLoader == null) {
+            logfileLoader = new LogfileLoader();
+        }
+        if (!logfileLoader.isAlive()) {
+            logfileLoader.start();
+        }
+        return logfileLoader;
+    }
+
+    private long getIdRange() {
+        long size;
+        if (BaseServlet.isInitialActivePOD()) {
+            size = 0;
+        } else if (BaseServlet.isInitialStandbyPOD()) {
+            size = SET_SIZE;
+        } else {
+            size = SET_SIZE * 2;
+        }
+        String range = String.format("[%X .. %X]", size, size + SET_SIZE - 1);
+        logger.debug("This server shall assign RECORD_IDs in the range " + range);
+        return size;
+    }
+
+    /**
+     * Return the bit set representing the record ID's that are loaded in this database.
+     *
+     * @return the bit set
+     */
+    public RLEBitSet getBitSet() {
+        return seqSet;
+    }
+
+    /**
+     * True if the LogfileLoader is currently waiting for work.
+     *
+     * @return true if idle
+     */
+    public boolean isIdle() {
+        return idle;
+    }
+
+    /**
+     * Run continuously to look for new logfiles in the spool directory and import them into the DB.
+     * The spool is checked once per second.  If free space on the MariaDB filesystem falls below
+     * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS
+     * table is compacted until free space rises above the threshold.
+     */
+    @Override
+    public void run() {
+        initializeNextid();
+        while (true) {
+            try {
+                File dirfile = new File(spooldir);
+                while (true) {
+                    runLogFileLoad(dirfile);
+                }
+            } catch (Exception e) {
+                logger.warn("PROV0020: Caught exception in LogfileLoader: " + e);
+            }
+        }
+    }
+
+    private void runLogFileLoad(File filesDir) {
+        File[] inFiles = filesDir.listFiles((dir, name) -> name.startsWith("IN."));
+        if (inFiles != null) {
+            if (inFiles.length == 0) {
+                idle = true;
+                try {
+                    Thread.sleep(1000L);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+                idle = false;
+            } else {
+                // Remove old rows
+                if (pruneRecords()) {
+                    // Removed at least some entries, recompute the bit map
+                    initializeNextid();
+                }
+                for (File file : inFiles) {
+                    processFile(file);
+                }
+            }
+        }
+    }
+
+    private void processFile(File infile) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("PROV8001 Starting " + infile + " ...");
+        }
+        long time = System.currentTimeMillis();
+        int[] array = process(infile);
+        time = System.currentTimeMillis() - time;
+        logger.info(String.format("PROV8000 Processed %s in %d ms; %d of %d records.",
+            infile.toString(), time, array[0], array[1]));
+        try {
+            Files.delete(infile.toPath());
+        } catch (IOException e) {
+            logger.info("PROV8001 failed to delete file " + infile.getName(), e);
+        }
+    }
+
+    boolean pruneRecords() {
+        boolean did1 = false;
+        long count = countRecords();
+        Parameters defaultLogRetention = Parameters.getParameter(Parameters.DEFAULT_LOG_RETENTION);
+        long threshold = (defaultLogRetention != null) ? Long.parseLong(defaultLogRetention.getValue()) : 1000000L;
+        Parameters provLogRetention = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);
+        if (provLogRetention != null) {
+            try {
+                long retention = Long.parseLong(provLogRetention.getValue());
+                // This check is to prevent inadvertent errors from wiping the table out
+                if (retention > 1000000L) {
+                    threshold = retention;
+                }
+            } catch (NumberFormatException e) {
+                // ignore
+            }
+        }
+        logger.debug("Pruning LOG_RECORD table: records in DB=" + count + ", threshold=" + threshold);
+        if (count > threshold) {
+            // we need to remove this many records
+            count -= threshold;
+            // histogram of records per day
+            Map<Long, Long> hist = getHistogram();
+            // Determine the cutoff point to remove the needed number of records
+            long sum = 0;
+            long cutoff = 0;
+            for (Long day : new TreeSet<>(hist.keySet())) {
+                sum += hist.get(day);
+                cutoff = day;
+                if (sum >= count) {
+                    break;
+                }
+            }
+            cutoff++;
+            // convert day to ms
+            cutoff *= 86400000L;
+            logger.debug("  Pruning records older than=" + (cutoff / 86400000L) + " (" + new Date(cutoff) + ")");
+
+            try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
+                // Limit to a million at a time to avoid typing up the DB for too long.
+                try (PreparedStatement ps = conn.prepareStatement(
+                    "DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000")) {
+                    ps.setLong(1, cutoff);
+                    while (count > 0) {
+                        if (!ps.execute()) {
+                            int dcount = ps.getUpdateCount();
+                            count -= dcount;
+                            logger.debug("  " + dcount + " rows deleted.");
+                            did1 |= (dcount != 0);
+                            if (dcount == 0) {
+                                count = 0;    // prevent inf. loops
+                            }
+                        } else {
+                            count = 0;    // shouldn't happen!
+                        }
+                    }
+                }
+                try (Statement stmt = conn.createStatement()) {
+                    stmt.execute("OPTIMIZE TABLE LOG_RECORDS");
+                }
+            } catch (SQLException e) {
+                logger.error("LogfileLoader.pruneRecords: " + e.getMessage(), e);
+            }
+        }
+        return did1;
+    }
+
+    private long countRecords() {
+        long count = 0;
+        try (Connection conn = ProvDbUtils.getInstance().getConnection();
+            PreparedStatement ps = conn.prepareStatement("SELECT COUNT(*) as COUNT from LOG_RECORDS");
+            ResultSet rs = ps.executeQuery()) {
+            if (rs.next()) {
+                count = rs.getLong("COUNT");
+            }
+        } catch (SQLException e) {
+            logger.error("LogfileLoader.countRecords: " + e.getMessage(), e);
+        }
+        return count;
+    }
+
+    private Map<Long, Long> getHistogram() {
+        Map<Long, Long> map = new HashMap<>();
+        try (Connection conn = ProvDbUtils.getInstance().getConnection();
+            PreparedStatement ps = conn.prepareStatement(
+                "SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY");
+            ResultSet rs = ps.executeQuery()) {
+            logger.debug("  LOG_RECORD table histogram...");
+            while (rs.next()) {
+                long day = rs.getLong("DAY");
+                long cnt = rs.getLong("COUNT");
+                map.put(day, cnt);
+                logger.debug("  " + day + "  " + cnt);
+            }
+        } catch (SQLException e) {
+            logger.error("LogfileLoader.getHistogram: " + e.getMessage(), e);
+        }
+        return map;
+    }
+
+    private void initializeNextid() {
+        try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
+            RLEBitSet nbs = new RLEBitSet();
+            try (Statement stmt = conn.createStatement()) {
+                // Build a bitset of all records in the LOG_RECORDS table
+                // We need to run this SELECT in stages, because otherwise we run out of memory!
+                final long stepsize = 6000000L;
+                boolean goAgain = true;
+                for (long i = 0; goAgain; i += stepsize) {
+                    String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize);
+                    try (ResultSet rs = stmt.executeQuery(sql)) {
+                        goAgain = false;
+                        while (rs.next()) {
+                            long recordId = rs.getLong("RECORD_ID");
+                            nbs.set(recordId);
+                            goAgain = true;
+                        }
+                    }
+                }
+            }
+            seqSet = nbs;
+            // Compare with the range for this server
+            // Determine the next ID for this set of record IDs
+            RLEBitSet tbs = (RLEBitSet) nbs.clone();
+            RLEBitSet idset = new RLEBitSet();
+            idset.set(setStart, setStart + SET_SIZE);
+            tbs.and(idset);
+            long bitLength = tbs.length();
+            nextId = (bitLength == 0) ? setStart : (bitLength - 1);
+            if (nextId >= setStart + SET_SIZE) {
+                // Handle wraparound, when the IDs reach the end of our "range"
+                Long[] last = null;
+                Iterator<Long[]> li = tbs.getRangeIterator();
+                while (li.hasNext()) {
+                    last = li.next();
+                }
+                if (last != null) {
+                    tbs.clear(last[0], last[1] + 1);
+                    bitLength = tbs.length();
+                    nextId = (bitLength == 0) ? setStart : (bitLength - 1);
+                }
+            }
+            logger.debug(String.format("LogfileLoader.initializeNextid, next ID is %d (%x)", nextId, nextId));
+        } catch (SQLException e) {
+            logger.error("LogfileLoader.initializeNextid: " + e.getMessage(), e);
+        }
+    }
+
+    @SuppressWarnings("resource")
+    int[] process(File file) {
+        int ok = 0;
+        int total = 0;
+        try (Connection conn = ProvDbUtils.getInstance().getConnection();
+            PreparedStatement ps = conn.prepareStatement(
+                "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) {
+            Reader reader = file.getPath().endsWith(".gz")
+                ? new InputStreamReader(new GZIPInputStream(new FileInputStream(file)))
+                : new FileReader(file);
+            try (LineNumberReader in = new LineNumberReader(reader)) {
+                String line;
+                while ((line = in.readLine()) != null) {
+                    try {
+                        for (Loadable rec : buildRecords(line)) {
+                            rec.load(ps);
+                            if (rec instanceof LogRecord) {
+                                LogRecord lr = ((LogRecord) rec);
+                                if (!seqSet.get(lr.getRecordId())) {
+                                    ps.executeUpdate();
+                                    seqSet.set(lr.getRecordId());
+                                } else {
+                                    logger.debug("Duplicate record ignored: " + lr.getRecordId());
+                                }
+                            } else {
+                                if (++nextId > setEnd) {
+                                    nextId = setStart;
+                                }
+                                ps.setLong(18, nextId);
+                                ps.executeUpdate();
+                                seqSet.set(nextId);
+                            }
+                            ps.clearParameters();
+                            ok++;
+                        }
+                    } catch (SQLException e) {
+                        logger.warn("PROV8003 Invalid value in record: " + line, e);
+                    } catch (NumberFormatException e) {
+                        logger.warn("PROV8004 Invalid number in record: " + line, e);
+                    } catch (ParseException e) {
+                        logger.warn("PROV8005 Invalid date in record: " + line, e);
+                    } catch (Exception e) {
+                        logger.warn("PROV8006 Invalid pattern in record: " + line, e);
+                    }
+                    total++;
+                }
+            }
+        } catch (SQLException | IOException e) {
+            logger.warn("PROV8007 Exception reading " + file + ": " + e);
+        }
+        return new int[]{ok, total};
+    }
+
+    Loadable[] buildRecords(String line) throws ParseException {
+        String[] pp = line.split("\\|");
+        if (pp != null && pp.length >= 7) {
+            String rtype = pp[1].toUpperCase();
+            if ("PUB".equals(rtype) && pp.length == 11) {
+                // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status
+                return new Loadable[]{new PublishRecord(pp)};
+            }
+            if ("DEL".equals(rtype) && pp.length == 12) {
+                // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid
+                String[] subs = pp[4].split("\\s+");
+                if (subs != null) {
+                    Loadable[] rv = new Loadable[subs.length];
+                    for (int i = 0; i < subs.length; i++) {
+                        // create a new record for each individual sub
+                        pp[4] = subs[i];
+                        rv[i] = new DeliveryRecord(pp);
+                    }
+                    return rv;
+                }
+            }
+            if ("EXP".equals(rtype) && pp.length == 11) {
+                // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts
+                ExpiryRecord expiryRecord = new ExpiryRecord(pp);
+                if ("other".equals(expiryRecord.getReason())) {
+                    logger.info("Invalid reason '" + pp[9] + "' changed to 'other' for record: "
+                        + expiryRecord.getPublishId());
+                }
+                return new Loadable[]{expiryRecord};
+            }
+            if ("PBF".equals(rtype) && pp.length == 12) {
+                // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error
+                return new Loadable[]{new PubFailRecord(pp)};
+            }
+            if ("DLX".equals(rtype) && pp.length == 7) {
+                // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent
+                return new Loadable[]{new DeliveryExtraRecord(pp)};
+            }
+            if ("LOG".equals(rtype) && (pp.length == 19 || pp.length == 20)) {
+                // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|
+                // feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id
+                return new Loadable[]{new LogRecord(pp)};
+            }
+        }
+        logger.warn("PROV8002 bad record: " + line);
+        return new Loadable[0];
+    }
+
+    /**
+     * The LogfileLoader can be run stand-alone by invoking the main() method of this class.
+     *
+     * @param str ignored
+     */
+    public static void main(String[] str) throws InterruptedException {
+        LogfileLoader.getLoader();
+        Thread.sleep(200000L);
+    }
+}