-/*******************************************************************************\r
- * ============LICENSE_START==================================================\r
- * * org.onap.dmaap\r
- * * ===========================================================================\r
- * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
- * * ===========================================================================\r
- * * Licensed under the Apache License, Version 2.0 (the "License");\r
- * * you may not use this file except in compliance with the License.\r
- * * You may obtain a copy of the License at\r
- * *\r
- * * http://www.apache.org/licenses/LICENSE-2.0\r
- * *\r
- * * Unless required by applicable law or agreed to in writing, software\r
- * * distributed under the License is distributed on an "AS IS" BASIS,\r
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
- * * See the License for the specific language governing permissions and\r
- * * limitations under the License.\r
- * * ============LICENSE_END====================================================\r
- * *\r
- * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
- * *\r
- ******************************************************************************/\r
-\r
-\r
-package org.onap.dmaap.datarouter.provisioning.utils;\r
-\r
-import java.io.File;\r
-import java.io.FileInputStream;\r
-import java.io.FileNotFoundException;\r
-import java.io.FileReader;\r
-import java.io.FilenameFilter;\r
-import java.io.IOException;\r
-import java.io.InputStreamReader;\r
-import java.io.LineNumberReader;\r
-import java.io.Reader;\r
-import java.sql.Connection;\r
-import java.sql.PreparedStatement;\r
-import java.sql.ResultSet;\r
-import java.sql.SQLException;\r
-import java.sql.Statement;\r
-import java.text.ParseException;\r
-import java.util.Date;\r
-import java.util.HashMap;\r
-import java.util.Iterator;\r
-import java.util.Map;\r
-import java.util.TreeSet;\r
-import java.util.zip.GZIPInputStream;\r
-\r
-import org.apache.log4j.Logger;\r
-import org.onap.dmaap.datarouter.provisioning.BaseServlet;\r
-import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord;\r
-import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord;\r
-import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord;\r
-import org.onap.dmaap.datarouter.provisioning.beans.Loadable;\r
-import org.onap.dmaap.datarouter.provisioning.beans.LogRecord;\r
-import org.onap.dmaap.datarouter.provisioning.beans.Parameters;\r
-import org.onap.dmaap.datarouter.provisioning.beans.PubFailRecord;\r
-import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord;\r
-\r
-/**\r
- * This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir.\r
- * These logfiles are loaded into the MariaDB LOG_RECORDS table. In a running provisioning server, there should only be\r
- * two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at\r
- * startup to load the old (1.0) style log tables into LOG_RECORDS;\r
- * <p>This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the\r
- * database.\r
- * This bit set is used to synchronize between provisioning servers.</p>\r
- *\r
- * @author Robert Eby\r
- * @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $\r
- */\r
-public class LogfileLoader extends Thread {\r
- /**\r
- * Default number of log records to keep when pruning. Keep 10M by default.\r
- */\r
- public static final long DEFAULT_LOG_RETENTION = 10000000L;\r
- /**\r
- * NOT USED: Percentage of free space required before old records are removed.\r
- */\r
- public static final int REQUIRED_FREE_PCT = 20;\r
-\r
- /**\r
- * This is a singleton -- there is only one LogfileLoader object in the server\r
- */\r
- private static LogfileLoader p;\r
-\r
- /**\r
- * Get the singleton LogfileLoader object, and start it if it is not running.\r
- *\r
- * @return the LogfileLoader\r
- */\r
- public static synchronized LogfileLoader getLoader() {\r
- if (p == null)\r
- p = new LogfileLoader();\r
- if (!p.isAlive())\r
- p.start();\r
- return p;\r
- }\r
-\r
- /**\r
- * The PreparedStatement which is loaded by a <i>Loadable</i>.\r
- */\r
- public static final String INSERT_SQL = "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";\r
- /**\r
- * Each server can assign this many IDs\r
- */\r
- private static final long SET_SIZE = (1L << 56);\r
-\r
- private final Logger logger;\r
- private final DB db;\r
- private final String spooldir;\r
- private final long set_start;\r
- private final long set_end;\r
- private RLEBitSet seq_set;\r
- private long nextid;\r
- private boolean idle;\r
-\r
- private LogfileLoader() {\r
- this.logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");\r
- this.db = new DB();\r
- this.spooldir = db.getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");\r
- this.set_start = getIdRange();\r
- this.set_end = set_start + SET_SIZE - 1;\r
- this.seq_set = new RLEBitSet();\r
- this.nextid = 0;\r
- this.idle = false;\r
-\r
- // This is a potentially lengthy operation, so has been moved to run()\r
- //initializeNextid();\r
- this.setDaemon(true);\r
- this.setName("LogfileLoader");\r
- }\r
-\r
- private long getIdRange() {\r
- long n;\r
- if (BaseServlet.isInitialActivePOD())\r
- n = 0;\r
- else if (BaseServlet.isInitialStandbyPOD())\r
- n = SET_SIZE;\r
- else\r
- n = SET_SIZE * 2;\r
- String r = String.format("[%X .. %X]", n, n + SET_SIZE - 1);\r
- logger.debug("This server shall assign RECORD_IDs in the range " + r);\r
- return n;\r
- }\r
-\r
- /**\r
- * Return the bit set representing the record ID's that are loaded in this database.\r
- *\r
- * @return the bit set\r
- */\r
- public RLEBitSet getBitSet() {\r
- return seq_set;\r
- }\r
-\r
- /**\r
- * True if the LogfileLoader is currently waiting for work.\r
- *\r
- * @return true if idle\r
- */\r
- public boolean isIdle() {\r
- return idle;\r
- }\r
-\r
- /**\r
- * Run continuously to look for new logfiles in the spool directory and import them into the DB.\r
- * The spool is checked once per second. If free space on the MariaDB filesystem falls below\r
- * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS\r
- * table is compacted until free space rises above the threshold.\r
- */\r
- @Override\r
- public void run() {\r
- initializeNextid(); // moved from the constructor\r
- while (true) {\r
- try {\r
- File dirfile = new File(spooldir);\r
- while (true) {\r
- // process IN files\r
- File[] infiles = dirfile.listFiles(new FilenameFilter() {\r
- @Override\r
- public boolean accept(File dir, String name) {\r
- return name.startsWith("IN.");\r
- }\r
- });\r
-\r
- if (infiles.length == 0) {\r
- idle = true;\r
- try {\r
- Thread.sleep(1000L);\r
- } catch (InterruptedException e) {\r
- Thread.currentThread().interrupt();\r
- }\r
- idle = false;\r
- } else {\r
- // Remove old rows\r
- if (pruneRecords()) {\r
- // Removed at least some entries, recompute the bit map\r
- initializeNextid();\r
- }\r
-\r
- // Process incoming logfiles\r
- for (File f : infiles) {\r
- if (logger.isDebugEnabled())\r
- logger.debug("PROV8001 Starting " + f + " ...");\r
- long time = System.currentTimeMillis();\r
- int[] n = process(f);\r
- time = System.currentTimeMillis() - time;\r
- logger.info(String\r
- .format("PROV8000 Processed %s in %d ms; %d of %d records.",\r
- f.toString(), time, n[0], n[1]));\r
- f.delete();\r
- }\r
- }\r
- }\r
- } catch (Exception e) {\r
- logger.warn("PROV0020: Caught exception in LogfileLoader: " + e);\r
- }\r
- }\r
- }\r
-\r
- boolean pruneRecords() {\r
- boolean did1 = false;\r
- long count = countRecords();\r
- long threshold = DEFAULT_LOG_RETENTION;\r
- Parameters param = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);\r
- if (param != null) {\r
- try {\r
- long n = Long.parseLong(param.getValue());\r
- // This check is to prevent inadvertent errors from wiping the table out\r
- if (n > 1000000L)\r
- threshold = n;\r
- } catch (NumberFormatException e) {\r
- // ignore\r
- }\r
- }\r
- logger.debug("Pruning LOG_RECORD table: records in DB=" + count + ", threshold=" + threshold);\r
- if (count > threshold) {\r
- count -= threshold; // we need to remove this many records;\r
- Map<Long, Long> hist = getHistogram(); // histogram of records per day\r
- // Determine the cutoff point to remove the needed number of records\r
- long sum = 0;\r
- long cutoff = 0;\r
- for (Long day : new TreeSet<Long>(hist.keySet())) {\r
- sum += hist.get(day);\r
- cutoff = day;\r
- if (sum >= count)\r
- break;\r
- }\r
- cutoff++;\r
- cutoff *= 86400000L; // convert day to ms\r
- logger.debug(" Pruning records older than=" + (cutoff / 86400000L) + " (" + new Date(cutoff) + ")");\r
-\r
- Connection conn = null;\r
- try {\r
- // Limit to a million at a time to avoid typing up the DB for too long.\r
- conn = db.getConnection();\r
- try(PreparedStatement ps = conn.prepareStatement("DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000")) {\r
- ps.setLong(1, cutoff);\r
- while (count > 0) {\r
- if (!ps.execute()) {\r
- int dcount = ps.getUpdateCount();\r
- count -= dcount;\r
- logger.debug(" " + dcount + " rows deleted.");\r
- did1 |= (dcount != 0);\r
- if (dcount == 0)\r
- count = 0; // prevent inf. loops\r
- } else {\r
- count = 0; // shouldn't happen!\r
- }\r
- }\r
- }\r
- try(Statement stmt = conn.createStatement()) {\r
- stmt.execute("OPTIMIZE TABLE LOG_RECORDS");\r
- }\r
- } catch (SQLException e) {\r
- System.err.println(e);\r
- logger.error(e);\r
- } finally {\r
- db.release(conn);\r
- }\r
- }\r
- return did1;\r
- }\r
-\r
- long countRecords() {\r
- long count = 0;\r
- Connection conn = null;\r
- try {\r
- conn = db.getConnection();\r
- try(Statement stmt = conn.createStatement()) {\r
- try(ResultSet rs = stmt.executeQuery("SELECT COUNT(*) as COUNT from LOG_RECORDS")) {\r
- if (rs.next()) {\r
- count = rs.getLong("COUNT");\r
- }\r
- }\r
- }\r
- } catch (SQLException e) {\r
- System.err.println(e);\r
- logger.error(e);\r
- } finally {\r
- db.release(conn);\r
- }\r
- return count;\r
- }\r
-\r
- Map<Long, Long> getHistogram() {\r
- Map<Long, Long> map = new HashMap<Long, Long>();\r
- Connection conn = null;\r
- try {\r
- logger.debug(" LOG_RECORD table histogram...");\r
- conn = db.getConnection();\r
- try(Statement stmt = conn.createStatement()) {\r
- try(ResultSet rs = stmt.executeQuery("SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY")) {\r
- while (rs.next()) {\r
- long day = rs.getLong("DAY");\r
- long cnt = rs.getLong("COUNT");\r
- map.put(day, cnt);\r
- logger.debug(" " + day + " " + cnt);\r
- }\r
- }\r
- }\r
- } catch (SQLException e) {\r
- System.err.println(e);\r
- logger.error(e);\r
- } finally {\r
- db.release(conn);\r
- }\r
- return map;\r
- }\r
-\r
- private void initializeNextid() {\r
- Connection conn = null;\r
- try {\r
- conn = db.getConnection();\r
- RLEBitSet nbs = new RLEBitSet();\r
- try(Statement stmt = conn.createStatement()) {\r
- // Build a bitset of all records in the LOG_RECORDS table\r
- // We need to run this SELECT in stages, because otherwise we run out of memory!\r
- final long stepsize = 6000000L;\r
- boolean go_again = true;\r
- for (long i = 0; go_again; i += stepsize) {\r
- String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize);\r
- try (ResultSet rs = stmt.executeQuery(sql)) {\r
- go_again = false;\r
- while (rs.next()) {\r
- long n = rs.getLong("RECORD_ID");\r
- nbs.set(n);\r
- go_again = true;\r
- }\r
- }\r
- }\r
- }\r
- seq_set = nbs;\r
- // Compare with the range for this server\r
- // Determine the next ID for this set of record IDs\r
- RLEBitSet tbs = (RLEBitSet) nbs.clone();\r
- RLEBitSet idset = new RLEBitSet();\r
- idset.set(set_start, set_start + SET_SIZE);\r
- tbs.and(idset);\r
- long t = tbs.length();\r
- nextid = (t == 0) ? set_start : (t - 1);\r
- if (nextid >= set_start + SET_SIZE) {\r
- // Handle wraparound, when the IDs reach the end of our "range"\r
- Long[] last = null;\r
- Iterator<Long[]> li = tbs.getRangeIterator();\r
- while (li.hasNext()) {\r
- last = li.next();\r
- }\r
- if (last != null) {\r
- tbs.clear(last[0], last[1] + 1);\r
- t = tbs.length();\r
- nextid = (t == 0) ? set_start : (t - 1);\r
- }\r
- }\r
- logger.debug(String.format("initializeNextid, next ID is %d (%x)", nextid, nextid));\r
- } catch (SQLException e) {\r
- System.err.println(e);\r
- logger.error(e);\r
- } finally {\r
- db.release(conn);\r
- }\r
- }\r
-\r
- @SuppressWarnings("resource")\r
- int[] process(File f) {\r
- int ok = 0, total = 0;\r
- try {\r
- Connection conn = db.getConnection();\r
- PreparedStatement ps = conn.prepareStatement(INSERT_SQL);\r
- Reader r = f.getPath().endsWith(".gz")\r
- ? new InputStreamReader(new GZIPInputStream(new FileInputStream(f)))\r
- : new FileReader(f);\r
- try(LineNumberReader in = new LineNumberReader(r)) {\r
- String line;\r
- while ((line = in.readLine()) != null) {\r
- try {\r
- for (Loadable rec : buildRecords(line)) {\r
- rec.load(ps);\r
- if (rec instanceof LogRecord) {\r
- LogRecord lr = ((LogRecord) rec);\r
- if (!seq_set.get(lr.getRecordId())) {\r
- ps.executeUpdate();\r
- seq_set.set(lr.getRecordId());\r
- } else\r
- logger.debug("Duplicate record ignored: " + lr.getRecordId());\r
- } else {\r
- if (++nextid > set_end)\r
- nextid = set_start;\r
- ps.setLong(18, nextid);\r
- ps.executeUpdate();\r
- seq_set.set(nextid);\r
- }\r
- ps.clearParameters();\r
- ok++;\r
- }\r
- } catch (SQLException e) {\r
- logger.warn("PROV8003 Invalid value in record: " + line);\r
- logger.debug(e);\r
- } catch (NumberFormatException e) {\r
- logger.warn("PROV8004 Invalid number in record: " + line);\r
- logger.debug(e);\r
- } catch (ParseException e) {\r
- logger.warn("PROV8005 Invalid date in record: " + line);\r
- logger.debug(e);\r
- } catch (Exception e) {\r
- logger.warn("PROV8006 Invalid pattern in record: " + line);\r
- logger.debug(e);\r
- }\r
- total++;\r
- }\r
- }\r
- ps.close();\r
- db.release(conn);\r
- conn = null;\r
- } catch (FileNotFoundException e) {\r
- logger.warn("PROV8007 Exception reading " + f + ": " + e);\r
- } catch (IOException e) {\r
- logger.warn("PROV8007 Exception reading " + f + ": " + e);\r
- } catch (SQLException e) {\r
- logger.warn("PROV8007 Exception reading " + f + ": " + e);\r
- }\r
- return new int[]{ok, total};\r
- }\r
-\r
- Loadable[] buildRecords(String line) throws ParseException {\r
- String[] pp = line.split("\\|");\r
- if (pp != null && pp.length >= 7) {\r
- String rtype = pp[1].toUpperCase();\r
- if (rtype.equals("PUB") && pp.length == 11) {\r
- // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status\r
- return new Loadable[]{new PublishRecord(pp)};\r
- }\r
- if (rtype.equals("DEL") && pp.length == 12) {\r
- // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid\r
- String[] subs = pp[4].split("\\s+");\r
- if (subs != null) {\r
- Loadable[] rv = new Loadable[subs.length];\r
- for (int i = 0; i < subs.length; i++) {\r
- // create a new record for each individual sub\r
- pp[4] = subs[i];\r
- rv[i] = new DeliveryRecord(pp);\r
- }\r
- return rv;\r
- }\r
- }\r
- if (rtype.equals("EXP") && pp.length == 11) {\r
- // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts\r
- ExpiryRecord e = new ExpiryRecord(pp);\r
- if (e.getReason().equals("other"))\r
- logger.info("Invalid reason '" + pp[9] + "' changed to 'other' for record: " + e.getPublishId());\r
- return new Loadable[]{e};\r
- }\r
- if (rtype.equals("PBF") && pp.length == 12) {\r
- // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error\r
- return new Loadable[]{new PubFailRecord(pp)};\r
- }\r
- if (rtype.equals("DLX") && pp.length == 7) {\r
- // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent\r
- return new Loadable[]{new DeliveryExtraRecord(pp)};\r
- }\r
- if (rtype.equals("LOG") && (pp.length == 19 || pp.length == 20)) {\r
- // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id\r
- return new Loadable[]{new LogRecord(pp)};\r
- }\r
- }\r
- logger.warn("PROV8002 bad record: " + line);\r
- return new Loadable[0];\r
- }\r
-\r
- /**\r
- * The LogfileLoader can be run stand-alone by invoking the main() method of this class.\r
- *\r
- * @param a ignored\r
- * @throws InterruptedException\r
- */\r
- public static void main(String[] a) throws InterruptedException {\r
- LogfileLoader.getLoader();\r
- Thread.sleep(200000L);\r
- }\r
-}\r
+/*******************************************************************************
+ * ============LICENSE_START==================================================
+ * * org.onap.dmaap
+ * * ===========================================================================
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * * ===========================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * * ============LICENSE_END====================================================
+ * *
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * *
+ ******************************************************************************/
+
+
+package org.onap.dmaap.datarouter.provisioning.utils;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.io.Reader;
+import java.nio.file.Files;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.zip.GZIPInputStream;
+import org.onap.dmaap.datarouter.provisioning.BaseServlet;
+import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord;
+import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord;
+import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord;
+import org.onap.dmaap.datarouter.provisioning.beans.Loadable;
+import org.onap.dmaap.datarouter.provisioning.beans.LogRecord;
+import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
+import org.onap.dmaap.datarouter.provisioning.beans.PubFailRecord;
+import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord;
+
+/**
+ * This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir.
+ * These logfiles are loaded into the MariaDB LOG_RECORDS table. In a running provisioning server, there should only be
+ * two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at
+ * startup to load the old (1.0) style log tables into LOG_RECORDS;
+ * <p>This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the
+ * database.
+ * This bit set is used to synchronize between provisioning servers.</p>
+ *
+ * @author Robert Eby
+ * @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $
+ */
+public class LogfileLoader extends Thread {
+ /**
+ * NOT USED: Percentage of free space required before old records are removed.
+ */
+ public static final int REQUIRED_FREE_PCT = 20;
+
+ /**
+ * This is a singleton -- there is only one LogfileLoader object in the server.
+ */
+ private static LogfileLoader logfileLoader;
+
+ /**
+ * The PreparedStatement which is loaded by a <i>Loadable</i>.
+ */
+ private static final String INSERT_SQL = "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ /**
+ * Each server can assign this many IDs.
+ */
+ private static final long SET_SIZE = (1L << 56);
+
+ private final EELFLogger logger;
+ private final DB db;
+ private final String spooldir;
+ private final long setStart;
+ private final long setEnd;
+ private RLEBitSet seqSet;
+ private long nextId;
+ private boolean idle;
+
+ /**
+ * Get the singleton LogfileLoader object, and start it if it is not running.
+ *
+ * @return the LogfileLoader
+ */
+ public static synchronized LogfileLoader getLoader() {
+ if (logfileLoader == null) {
+ logfileLoader = new LogfileLoader();
+ }
+ if (!logfileLoader.isAlive()) {
+ logfileLoader.start();
+ }
+ return logfileLoader;
+ }
+
+
+ private LogfileLoader() {
+ this.logger = EELFManager.getInstance().getLogger("InternalLog");
+ this.db = new DB();
+ this.spooldir = db.getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
+ this.setStart = getIdRange();
+ this.setEnd = setStart + SET_SIZE - 1;
+ this.seqSet = new RLEBitSet();
+ this.nextId = 0;
+ this.idle = false;
+ this.setDaemon(true);
+ this.setName("LogfileLoader");
+ }
+
+ private long getIdRange() {
+ long n;
+ if (BaseServlet.isInitialActivePOD()) {
+ n = 0;
+ } else if (BaseServlet.isInitialStandbyPOD()) {
+ n = SET_SIZE;
+ } else {
+ n = SET_SIZE * 2;
+ }
+ String r = String.format("[%X .. %X]", n, n + SET_SIZE - 1);
+ logger.debug("This server shall assign RECORD_IDs in the range " + r);
+ return n;
+ }
+
+ /**
+ * Return the bit set representing the record ID's that are loaded in this database.
+ *
+ * @return the bit set
+ */
+ public RLEBitSet getBitSet() {
+ return seqSet;
+ }
+
+ /**
+ * True if the LogfileLoader is currently waiting for work.
+ *
+ * @return true if idle
+ */
+ public boolean isIdle() {
+ return idle;
+ }
+
+ /**
+ * Run continuously to look for new logfiles in the spool directory and import them into the DB.
+ * The spool is checked once per second. If free space on the MariaDB filesystem falls below
+ * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS
+ * table is compacted until free space rises above the threshold.
+ */
+ @Override
+ public void run() {
+ initializeNextid();
+ while (true) {
+ try {
+ File dirfile = new File(spooldir);
+ while (true) {
+ runLogFileLoad(dirfile);
+ }
+ } catch (Exception e) {
+ logger.warn("PROV0020: Caught exception in LogfileLoader: " + e);
+ }
+ }
+ }
+
+ private void runLogFileLoad(File filesDir) {
+ File[] inFiles = filesDir.listFiles((dir, name) -> name.startsWith("IN."));
+ if (inFiles != null) {
+ if (inFiles.length == 0) {
+ idle = true;
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ idle = false;
+ } else {
+ // Remove old rows
+ if (pruneRecords()) {
+ // Removed at least some entries, recompute the bit map
+ initializeNextid();
+ }
+ for (File file : inFiles) {
+ processFile(file);
+ }
+ }
+ }
+ }
+
+ private void processFile(File infile) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("PROV8001 Starting " + infile + " ...");
+ }
+ long time = System.currentTimeMillis();
+ int[] n = process(infile);
+ time = System.currentTimeMillis() - time;
+ logger.info(String.format("PROV8000 Processed %s in %d ms; %d of %d records.",
+ infile.toString(), time, n[0], n[1]));
+ try {
+ Files.delete(infile.toPath());
+ } catch (IOException e) {
+ logger.info("PROV8001 failed to delete file " + infile.getName(), e);
+ }
+ }
+
+ boolean pruneRecords() {
+ boolean did1 = false;
+ long count = countRecords();
+ Parameters defaultLogRetention = Parameters.getParameter(Parameters.DEFAULT_LOG_RETENTION);
+ long threshold = (defaultLogRetention != null) ? Long.parseLong(defaultLogRetention.getValue()) : 1000000L;
+ Parameters provLogRetention = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);
+ if (provLogRetention != null) {
+ try {
+ long n = Long.parseLong(provLogRetention.getValue());
+ // This check is to prevent inadvertent errors from wiping the table out
+ if (n > 1000000L) {
+ threshold = n;
+ }
+ } catch (NumberFormatException e) {
+ // ignore
+ }
+ }
+ logger.debug("Pruning LOG_RECORD table: records in DB=" + count + ", threshold=" + threshold);
+ if (count > threshold) {
+ // we need to remove this many records
+ count -= threshold;
+ // histogram of records per day
+ Map<Long, Long> hist = getHistogram();
+ // Determine the cutoff point to remove the needed number of records
+ long sum = 0;
+ long cutoff = 0;
+ for (Long day : new TreeSet<>(hist.keySet())) {
+ sum += hist.get(day);
+ cutoff = day;
+ if (sum >= count) {
+ break;
+ }
+ }
+ cutoff++;
+ // convert day to ms
+ cutoff *= 86400000L;
+ logger.debug(" Pruning records older than=" + (cutoff / 86400000L) + " (" + new Date(cutoff) + ")");
+
+ Connection conn = null;
+ try {
+ // Limit to a million at a time to avoid typing up the DB for too long.
+ conn = db.getConnection();
+ try (PreparedStatement ps = conn.prepareStatement("DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000")) {
+ ps.setLong(1, cutoff);
+ while (count > 0) {
+ if (!ps.execute()) {
+ int dcount = ps.getUpdateCount();
+ count -= dcount;
+ logger.debug(" " + dcount + " rows deleted.");
+ did1 |= (dcount != 0);
+ if (dcount == 0) {
+ count = 0; // prevent inf. loops
+ }
+ } else {
+ count = 0; // shouldn't happen!
+ }
+ }
+ }
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute("OPTIMIZE TABLE LOG_RECORDS");
+ }
+ } catch (SQLException e) {
+ logger.error(e.toString());
+ } finally {
+ db.release(conn);
+ }
+ }
+ return did1;
+ }
+
+ long countRecords() {
+ long count = 0;
+ Connection conn = null;
+ try {
+ conn = db.getConnection();
+ try (Statement stmt = conn.createStatement()) {
+ try (ResultSet rs = stmt.executeQuery("SELECT COUNT(*) as COUNT from LOG_RECORDS")) {
+ if (rs.next()) {
+ count = rs.getLong("COUNT");
+ }
+ }
+ }
+ } catch (SQLException e) {
+ logger.error(e.toString());
+ } finally {
+ db.release(conn);
+ }
+ return count;
+ }
+
+ Map<Long, Long> getHistogram() {
+ Map<Long, Long> map = new HashMap<>();
+ Connection conn = null;
+ try {
+ logger.debug(" LOG_RECORD table histogram...");
+ conn = db.getConnection();
+ try (Statement stmt = conn.createStatement()) {
+ try (ResultSet rs = stmt.executeQuery("SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY")) {
+ while (rs.next()) {
+ long day = rs.getLong("DAY");
+ long cnt = rs.getLong("COUNT");
+ map.put(day, cnt);
+ logger.debug(" " + day + " " + cnt);
+ }
+ }
+ }
+ } catch (SQLException e) {
+ logger.error(e.toString());
+ } finally {
+ db.release(conn);
+ }
+ return map;
+ }
+
+ private void initializeNextid() {
+ Connection conn = null;
+ try {
+ conn = db.getConnection();
+ RLEBitSet nbs = new RLEBitSet();
+ try (Statement stmt = conn.createStatement()) {
+ // Build a bitset of all records in the LOG_RECORDS table
+ // We need to run this SELECT in stages, because otherwise we run out of memory!
+ final long stepsize = 6000000L;
+ boolean goAgain = true;
+ for (long i = 0; goAgain; i += stepsize) {
+ String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize);
+ try (ResultSet rs = stmt.executeQuery(sql)) {
+ goAgain = false;
+ while (rs.next()) {
+ long n = rs.getLong("RECORD_ID");
+ nbs.set(n);
+ goAgain = true;
+ }
+ }
+ }
+ }
+ seqSet = nbs;
+ // Compare with the range for this server
+ // Determine the next ID for this set of record IDs
+ RLEBitSet tbs = (RLEBitSet) nbs.clone();
+ RLEBitSet idset = new RLEBitSet();
+ idset.set(setStart, setStart + SET_SIZE);
+ tbs.and(idset);
+ long t = tbs.length();
+ nextId = (t == 0) ? setStart : (t - 1);
+ if (nextId >= setStart + SET_SIZE) {
+ // Handle wraparound, when the IDs reach the end of our "range"
+ Long[] last = null;
+ Iterator<Long[]> li = tbs.getRangeIterator();
+ while (li.hasNext()) {
+ last = li.next();
+ }
+ if (last != null) {
+ tbs.clear(last[0], last[1] + 1);
+ t = tbs.length();
+ nextId = (t == 0) ? setStart : (t - 1);
+ }
+ }
+ logger.debug(String.format("initializeNextid, next ID is %d (%x)", nextId, nextId));
+ } catch (SQLException e) {
+ logger.error(e.toString());
+ } finally {
+ db.release(conn);
+ }
+ }
+
+ @SuppressWarnings("resource")
+ int[] process(File f) {
+ int ok = 0;
+ int total = 0;
+ try {
+ Connection conn = db.getConnection();
+ PreparedStatement ps = conn.prepareStatement(INSERT_SQL);
+ Reader r = f.getPath().endsWith(".gz")
+ ? new InputStreamReader(new GZIPInputStream(new FileInputStream(f)))
+ : new FileReader(f);
+ try (LineNumberReader in = new LineNumberReader(r)) {
+ String line;
+ while ((line = in.readLine()) != null) {
+ try {
+ for (Loadable rec : buildRecords(line)) {
+ rec.load(ps);
+ if (rec instanceof LogRecord) {
+ LogRecord lr = ((LogRecord) rec);
+ if (!seqSet.get(lr.getRecordId())) {
+ ps.executeUpdate();
+ seqSet.set(lr.getRecordId());
+ } else {
+ logger.debug("Duplicate record ignored: " + lr.getRecordId());
+ }
+ } else {
+ if (++nextId > setEnd) {
+ nextId = setStart;
+ }
+ ps.setLong(18, nextId);
+ ps.executeUpdate();
+ seqSet.set(nextId);
+ }
+ ps.clearParameters();
+ ok++;
+ }
+ } catch (SQLException e) {
+ logger.warn("PROV8003 Invalid value in record: " + line, e);
+ } catch (NumberFormatException e) {
+ logger.warn("PROV8004 Invalid number in record: " + line, e);
+ } catch (ParseException e) {
+ logger.warn("PROV8005 Invalid date in record: " + line, e);
+ } catch (Exception e) {
+ logger.warn("PROV8006 Invalid pattern in record: " + line, e);
+ }
+ total++;
+ }
+ }
+ ps.close();
+ db.release(conn);
+ } catch (SQLException | IOException e) {
+ logger.warn("PROV8007 Exception reading " + f + ": " + e);
+ }
+ return new int[]{ok, total};
+ }
+
+ Loadable[] buildRecords(String line) throws ParseException {
+ String[] pp = line.split("\\|");
+ if (pp != null && pp.length >= 7) {
+ String rtype = pp[1].toUpperCase();
+ if ("PUB".equals(rtype) && pp.length == 11) {
+ // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status
+ return new Loadable[]{new PublishRecord(pp)};
+ }
+ if ("DEL".equals(rtype) && pp.length == 12) {
+ // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid
+ String[] subs = pp[4].split("\\s+");
+ if (subs != null) {
+ Loadable[] rv = new Loadable[subs.length];
+ for (int i = 0; i < subs.length; i++) {
+ // create a new record for each individual sub
+ pp[4] = subs[i];
+ rv[i] = new DeliveryRecord(pp);
+ }
+ return rv;
+ }
+ }
+ if ("EXP".equals(rtype) && pp.length == 11) {
+ // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts
+ ExpiryRecord e = new ExpiryRecord(pp);
+ if ("other".equals(e.getReason())) {
+ logger.info("Invalid reason '" + pp[9] + "' changed to 'other' for record: " + e.getPublishId());
+ }
+ return new Loadable[]{e};
+ }
+ if ("PBF".equals(rtype) && pp.length == 12) {
+ // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error
+ return new Loadable[]{new PubFailRecord(pp)};
+ }
+ if ("DLX".equals(rtype) && pp.length == 7) {
+ // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent
+ return new Loadable[]{new DeliveryExtraRecord(pp)};
+ }
+ if ("LOG".equals(rtype) && (pp.length == 19 || pp.length == 20)) {
+ // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id
+ return new Loadable[]{new LogRecord(pp)};
+ }
+ }
+ logger.warn("PROV8002 bad record: " + line);
+ return new Loadable[0];
+ }
+
+ /**
+ * The LogfileLoader can be run stand-alone by invoking the main() method of this class.
+ *
+ * @param a ignored
+ */
+ public static void main(String[] a) throws InterruptedException {
+ LogfileLoader.getLoader();
+ Thread.sleep(200000L);
+ }
+}