--- /dev/null
+/*******************************************************************************\r
+ * ============LICENSE_START==================================================\r
+ * * org.onap.dmaap\r
+ * * ===========================================================================\r
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.\r
+ * * ===========================================================================\r
+ * * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * * you may not use this file except in compliance with the License.\r
+ * * You may obtain a copy of the License at\r
+ * * \r
+ * * http://www.apache.org/licenses/LICENSE-2.0\r
+ * * \r
+ * * Unless required by applicable law or agreed to in writing, software\r
+ * * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * * See the License for the specific language governing permissions and\r
+ * * limitations under the License.\r
+ * * ============LICENSE_END====================================================\r
+ * *\r
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.\r
+ * *\r
+ ******************************************************************************/\r
+\r
+\r
+package org.onap.dmaap.datarouter.provisioning.utils;\r
+\r
+import java.io.File;\r
+import java.io.FileInputStream;\r
+import java.io.FileNotFoundException;\r
+import java.io.FileReader;\r
+import java.io.FilenameFilter;\r
+import java.io.IOException;\r
+import java.io.InputStreamReader;\r
+import java.io.LineNumberReader;\r
+import java.io.Reader;\r
+import java.sql.Connection;\r
+import java.sql.PreparedStatement;\r
+import java.sql.ResultSet;\r
+import java.sql.SQLException;\r
+import java.sql.Statement;\r
+import java.text.ParseException;\r
+import java.util.Date;\r
+import java.util.HashMap;\r
+import java.util.Iterator;\r
+import java.util.Map;\r
+import java.util.TreeSet;\r
+import java.util.zip.GZIPInputStream;\r
+\r
+import org.apache.log4j.Logger;\r
+import org.onap.dmaap.datarouter.provisioning.BaseServlet;\r
+import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord;\r
+import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord;\r
+import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord;\r
+import org.onap.dmaap.datarouter.provisioning.beans.Loadable;\r
+import org.onap.dmaap.datarouter.provisioning.beans.LogRecord;\r
+import org.onap.dmaap.datarouter.provisioning.beans.Parameters;\r
+import org.onap.dmaap.datarouter.provisioning.beans.PubFailRecord;\r
+import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord;\r
+\r
+/**\r
+ * This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir.\r
+ * These logfiles are loaded into the MySQL LOG_RECORDS table. In a running provisioning server, there should only be\r
+ * two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at\r
+ * startup to load the old (1.0) style log tables into LOG_RECORDS;\r
+ * <p>This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the\r
+ * database.\r
+ * This bit set is used to synchronize between provisioning servers.</p>\r
+ *\r
+ * @author Robert Eby\r
+ * @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $\r
+ */\r
+public class LogfileLoader extends Thread {\r
+ /** Default number of log records to keep when pruning. Keep 10M by default. */\r
+ public static final long DEFAULT_LOG_RETENTION = 10000000L;\r
+ /** NOT USED: Percentage of free space required before old records are removed. */\r
+ public static final int REQUIRED_FREE_PCT = 20;\r
+\r
+ /** This is a singleton -- there is only one LogfileLoader object in the server */\r
+ private static LogfileLoader p;\r
+\r
+ /**\r
+ * Get the singleton LogfileLoader object, and start it if it is not running.\r
+ * @return the LogfileLoader\r
+ */\r
+ public static synchronized LogfileLoader getLoader() {\r
+ if (p == null)\r
+ p = new LogfileLoader();\r
+ if (!p.isAlive())\r
+ p.start();\r
+ return p;\r
+ }\r
+\r
+ /** The PreparedStatement which is loaded by a <i>Loadable</i>. */\r
+ public static final String INSERT_SQL = "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";\r
+ /** Each server can assign this many IDs */\r
+ private static final long SET_SIZE = (1L << 56);\r
+\r
+ private final Logger logger;\r
+ private final DB db;\r
+ private final String spooldir;\r
+ private final long set_start;\r
+ private final long set_end;\r
+ private RLEBitSet seq_set;\r
+ private long nextid;\r
+ private boolean idle;\r
+\r
+ private LogfileLoader() {\r
+ this.logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");\r
+ this.db = new DB();\r
+ this.spooldir = db.getProperties().getProperty("com.att.research.datarouter.provserver.spooldir");\r
+ this.set_start = getIdRange();\r
+ this.set_end = set_start + SET_SIZE - 1;\r
+ this.seq_set = new RLEBitSet();\r
+ this.nextid = 0;\r
+ this.idle = false;\r
+\r
+ // This is a potentially lengthy operation, so has been moved to run()\r
+ //initializeNextid();\r
+ this.setDaemon(true);\r
+ this.setName("LogfileLoader");\r
+ }\r
+\r
+ private long getIdRange() {\r
+ long n;\r
+ if (BaseServlet.isInitialActivePOD())\r
+ n = 0;\r
+ else if (BaseServlet.isInitialStandbyPOD())\r
+ n = SET_SIZE;\r
+ else\r
+ n = SET_SIZE * 2;\r
+ String r = String.format("[%X .. %X]", n, n+SET_SIZE-1);\r
+ logger.debug("This server shall assign RECORD_IDs in the range "+r);\r
+ return n;\r
+ }\r
+ /**\r
+ * Return the bit set representing the record ID's that are loaded in this database.\r
+ * @return the bit set\r
+ */\r
+ public RLEBitSet getBitSet() {\r
+ return seq_set;\r
+ }\r
+ /**\r
+ * True if the LogfileLoader is currently waiting for work.\r
+ * @return true if idle\r
+ */\r
+ public boolean isIdle() {\r
+ return idle;\r
+ }\r
+ /**\r
+ * Run continuously to look for new logfiles in the spool directory and import them into the DB.\r
+ * The spool is checked once per second. If free space on the MySQL filesystem falls below\r
+ * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS\r
+ * table is compacted until free space rises above the threshold.\r
+ */\r
+ @Override\r
+ public void run() {\r
+ initializeNextid(); // moved from the constructor\r
+ while (true) {\r
+ try {\r
+ File dirfile = new File(spooldir);\r
+ while (true) {\r
+ // process IN files\r
+ File[] infiles = dirfile.listFiles(new FilenameFilter() {\r
+ @Override\r
+ public boolean accept(File dir, String name) {\r
+ return name.startsWith("IN.");\r
+ }\r
+ });\r
+\r
+ if (infiles.length == 0) {\r
+ idle = true;\r
+ try {\r
+ Thread.sleep(1000L);\r
+ } catch (InterruptedException e) {\r
+ }\r
+ idle = false;\r
+ } else {\r
+ // Remove old rows\r
+ if (pruneRecords()) {\r
+ // Removed at least some entries, recompute the bit map\r
+ initializeNextid();\r
+ }\r
+\r
+ // Process incoming logfiles\r
+ for (File f : infiles) {\r
+ if (logger.isDebugEnabled())\r
+ logger.debug("PROV8001 Starting " + f + " ...");\r
+ long time = System.currentTimeMillis();\r
+ int[] n = process(f);\r
+ time = System.currentTimeMillis() - time;\r
+ logger.info(String\r
+ .format("PROV8000 Processed %s in %d ms; %d of %d records.",\r
+ f.toString(), time, n[0], n[1]));\r
+ f.delete();\r
+ }\r
+ }\r
+ }\r
+ } catch (Exception e) {\r
+ logger.warn("PROV0020: Caught exception in LogfileLoader: " + e);\r
+ e.printStackTrace();\r
+ }\r
+ }\r
+ }\r
+ private boolean pruneRecords() {\r
+ boolean did1 = false;\r
+ long count = countRecords();\r
+ long threshold = DEFAULT_LOG_RETENTION;\r
+ Parameters param = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);\r
+ if (param != null) {\r
+ try {\r
+ long n = Long.parseLong(param.getValue());\r
+ // This check is to prevent inadvertent errors from wiping the table out\r
+ if (n > 1000000L)\r
+ threshold = n;\r
+ } catch (NumberFormatException e) {\r
+ // ignore\r
+ }\r
+ }\r
+ logger.debug("Pruning LOG_RECORD table: records in DB="+count+", threshold="+threshold);\r
+ if (count > threshold) {\r
+ count -= threshold; // we need to remove this many records;\r
+ Map<Long,Long> hist = getHistogram(); // histogram of records per day\r
+ // Determine the cutoff point to remove the needed number of records\r
+ long sum = 0;\r
+ long cutoff = 0;\r
+ for (Long day : new TreeSet<Long>(hist.keySet())) {\r
+ sum += hist.get(day);\r
+ cutoff = day;\r
+ if (sum >= count)\r
+ break;\r
+ }\r
+ cutoff++;\r
+ cutoff *= 86400000L; // convert day to ms\r
+ logger.debug(" Pruning records older than="+(cutoff/86400000L)+" ("+new Date(cutoff)+")");\r
+\r
+ Connection conn = null;\r
+ try {\r
+ // Limit to a million at a time to avoid typing up the DB for too long.\r
+ conn = db.getConnection();\r
+ PreparedStatement ps = conn.prepareStatement("DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000");\r
+ ps.setLong(1, cutoff);\r
+ while (count > 0) {\r
+ if (!ps.execute()) {\r
+ int dcount = ps.getUpdateCount();\r
+ count -= dcount;\r
+ logger.debug(" "+dcount+" rows deleted.");\r
+ did1 |= (dcount!=0);\r
+ if (dcount == 0)\r
+ count = 0; // prevent inf. loops\r
+ } else {\r
+ count = 0; // shouldn't happen!\r
+ }\r
+ }\r
+ ps.close();\r
+ Statement stmt = conn.createStatement();\r
+ stmt.execute("OPTIMIZE TABLE LOG_RECORDS");\r
+ stmt.close();\r
+ } catch (SQLException e) {\r
+ System.err.println(e);\r
+ e.printStackTrace();\r
+ } finally {\r
+ db.release(conn);\r
+ }\r
+ }\r
+ return did1;\r
+ }\r
+ private long countRecords() {\r
+ long count = 0;\r
+ Connection conn = null;\r
+ try {\r
+ conn = db.getConnection();\r
+ Statement stmt = conn.createStatement();\r
+ ResultSet rs = stmt.executeQuery("SELECT COUNT(*) as COUNT from LOG_RECORDS");\r
+ if (rs.next()) {\r
+ count = rs.getLong("COUNT");\r
+ }\r
+ rs.close();\r
+ stmt.close();\r
+ } catch (SQLException e) {\r
+ System.err.println(e);\r
+ e.printStackTrace();\r
+ } finally {\r
+ db.release(conn);\r
+ }\r
+ return count;\r
+ }\r
+ private Map<Long,Long> getHistogram() {\r
+ Map<Long,Long> map = new HashMap<Long,Long>();\r
+ Connection conn = null;\r
+ try {\r
+ logger.debug(" LOG_RECORD table histogram...");\r
+ conn = db.getConnection();\r
+ Statement stmt = conn.createStatement();\r
+ ResultSet rs = stmt.executeQuery("SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY");\r
+ while (rs.next()) {\r
+ long day = rs.getLong("DAY");\r
+ long cnt = rs.getLong("COUNT");\r
+ map.put(day, cnt);\r
+ logger.debug(" "+day + " "+cnt);\r
+ }\r
+ rs.close();\r
+ stmt.close();\r
+ } catch (SQLException e) {\r
+ System.err.println(e);\r
+ e.printStackTrace();\r
+ } finally {\r
+ db.release(conn);\r
+ }\r
+ return map;\r
+ }\r
+ private void initializeNextid() {\r
+ Connection conn = null;\r
+ try {\r
+ conn = db.getConnection();\r
+ Statement stmt = conn.createStatement();\r
+ // Build a bitset of all records in the LOG_RECORDS table\r
+ // We need to run this SELECT in stages, because otherwise we run out of memory!\r
+ RLEBitSet nbs = new RLEBitSet();\r
+ final long stepsize = 6000000L;\r
+ boolean go_again = true;\r
+ for (long i = 0; go_again; i += stepsize) {\r
+ String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize);\r
+ ResultSet rs = stmt.executeQuery(sql);\r
+ go_again = false;\r
+ while (rs.next()) {\r
+ long n = rs.getLong("RECORD_ID");\r
+ nbs.set(n);\r
+ go_again = true;\r
+ }\r
+ rs.close();\r
+ }\r
+ stmt.close();\r
+ seq_set = nbs;\r
+\r
+ // Compare with the range for this server\r
+ // Determine the next ID for this set of record IDs\r
+ RLEBitSet tbs = (RLEBitSet) nbs.clone();\r
+ RLEBitSet idset = new RLEBitSet();\r
+ idset.set(set_start, set_start+SET_SIZE);\r
+ tbs.and(idset);\r
+ long t = tbs.length();\r
+ nextid = (t == 0) ? set_start : (t - 1);\r
+ if (nextid >= set_start+SET_SIZE) {\r
+ // Handle wraparound, when the IDs reach the end of our "range"\r
+ Long[] last = null;\r
+ Iterator<Long[]> li = tbs.getRangeIterator();\r
+ while (li.hasNext()) {\r
+ last = li.next();\r
+ }\r
+ if (last != null) {\r
+ tbs.clear(last[0], last[1]+1);\r
+ t = tbs.length();\r
+ nextid = (t == 0) ? set_start : (t - 1);\r
+ }\r
+ }\r
+ logger.debug(String.format("initializeNextid, next ID is %d (%x)", nextid, nextid));\r
+ } catch (SQLException e) {\r
+ System.err.println(e);\r
+ e.printStackTrace();\r
+ } finally {\r
+ db.release(conn);\r
+ }\r
+ }\r
+// OLD CODE - commented here for historical purposes\r
+//\r
+// private boolean pruneRecordsOldAlgorithm() {\r
+// // Determine space available -- available space must be at least 20% under /opt/app/mysql\r
+// int pct = getFreePercentage();\r
+// boolean did1 = false;\r
+// while (pct < REQUIRED_FREE_PCT) {\r
+// logger.info("PROV8008: Free space is " + pct + "% - removing old log entries");\r
+// boolean didit = removeOldestEntries();\r
+// pct = didit ? getFreePercentage() : 100; // don't loop endlessly\r
+// did1 |= didit;\r
+// }\r
+// return did1;\r
+// }\r
+// private int getFreePercentage() {\r
+// FileSystem fs = (Paths.get("/opt/app/mysql")).getFileSystem();\r
+// long total = 0;\r
+// long avail = 0;\r
+// try {\r
+// for (FileStore store : fs.getFileStores()) {\r
+// total += store.getTotalSpace();\r
+// avail += store.getUsableSpace();\r
+// }\r
+// } catch (IOException e) {\r
+// }\r
+// try { fs.close(); } catch (Exception e) { }\r
+// return (int)((avail * 100) / total);\r
+// }\r
+// private boolean removeOldestEntries() {\r
+// // Remove the last days worth of entries\r
+// Connection conn = null;\r
+// try {\r
+// conn = db.getConnection();\r
+// Statement stmt = conn.createStatement();\r
+// ResultSet rs = stmt.executeQuery("select min(event_time) as MIN from LOG_RECORDS");\r
+// if (rs != null) {\r
+// if (rs.next()) {\r
+// // Compute the end of the first day of logs\r
+// long first = rs.getLong("MIN");\r
+// Calendar cal = new GregorianCalendar();\r
+// cal.setTime(new Date(first));\r
+// cal.add(Calendar.DAY_OF_YEAR, 1);\r
+// cal.set(Calendar.HOUR_OF_DAY, 0);\r
+// cal.set(Calendar.MINUTE, 0);\r
+// cal.set(Calendar.SECOND, 0);\r
+// cal.set(Calendar.MILLISECOND, 0);\r
+// if (!stmt.execute("delete from LOG_RECORDS where event_time < " + cal.getTimeInMillis())) {\r
+// int count = stmt.getUpdateCount();\r
+// logger.info("PROV0009: Removed "+count+" old log entries.");\r
+// stmt.execute("OPTIMIZE TABLE LOG_RECORDS");\r
+// }\r
+// rs.close();\r
+// stmt.close();\r
+// return true;\r
+// }\r
+// rs.close();\r
+// }\r
+// stmt.close();\r
+// } catch (SQLException e) {\r
+// System.err.println(e);\r
+// e.printStackTrace();\r
+// } finally {\r
+// db.release(conn);\r
+// }\r
+// return false;\r
+// }\r
+ @SuppressWarnings("resource")\r
+ private int[] process(File f) {\r
+ int ok = 0, total = 0;\r
+ try {\r
+ Connection conn = db.getConnection();\r
+ PreparedStatement ps = conn.prepareStatement(INSERT_SQL);\r
+ Reader r = f.getPath().endsWith(".gz")\r
+ ? new InputStreamReader(new GZIPInputStream(new FileInputStream(f)))\r
+ : new FileReader(f);\r
+ LineNumberReader in = new LineNumberReader(r);\r
+ String line;\r
+ while ((line = in.readLine()) != null) {\r
+ try {\r
+ for (Loadable rec : buildRecords(line)) {\r
+ rec.load(ps);\r
+ if (rec instanceof LogRecord) {\r
+ LogRecord lr = ((LogRecord)rec);\r
+ if (!seq_set.get(lr.getRecordId())) {\r
+ ps.executeUpdate();\r
+ seq_set.set(lr.getRecordId());\r
+ } else\r
+ logger.debug("Duplicate record ignored: "+lr.getRecordId());\r
+ } else {\r
+ if (++nextid > set_end)\r
+ nextid = set_start;\r
+ ps.setLong(18, nextid);\r
+ ps.executeUpdate();\r
+ seq_set.set(nextid);\r
+ }\r
+ ps.clearParameters();\r
+ ok++;\r
+ }\r
+ } catch (SQLException e) {\r
+ logger.warn("PROV8003 Invalid value in record: "+line);\r
+ logger.debug(e);\r
+ e.printStackTrace();\r
+ } catch (NumberFormatException e) {\r
+ logger.warn("PROV8004 Invalid number in record: "+line);\r
+ logger.debug(e);\r
+ e.printStackTrace();\r
+ } catch (ParseException e) {\r
+ logger.warn("PROV8005 Invalid date in record: "+line);\r
+ logger.debug(e);\r
+ e.printStackTrace();\r
+ } catch (Exception e) {\r
+ logger.warn("PROV8006 Invalid pattern in record: "+line);\r
+ logger.debug(e);\r
+ e.printStackTrace();\r
+ }\r
+ total++;\r
+ }\r
+ in.close();\r
+ ps.close();\r
+ db.release(conn);\r
+ conn = null;\r
+ } catch (FileNotFoundException e) {\r
+ logger.warn("PROV8007 Exception reading "+f+": "+e);\r
+ } catch (IOException e) {\r
+ logger.warn("PROV8007 Exception reading "+f+": "+e);\r
+ } catch (SQLException e) {\r
+ logger.warn("PROV8007 Exception reading "+f+": "+e);\r
+ }\r
+ return new int[] { ok, total };\r
+ }\r
+ private Loadable[] buildRecords(String line) throws ParseException {\r
+ String[] pp = line.split("\\|");\r
+ if (pp != null && pp.length >= 7) {\r
+ String rtype = pp[1].toUpperCase();\r
+ if (rtype.equals("PUB") && pp.length == 11) {\r
+ // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status\r
+ return new Loadable[] { new PublishRecord(pp) };\r
+ }\r
+ if (rtype.equals("DEL") && pp.length == 12) {\r
+ // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid\r
+ String[] subs = pp[4].split("\\s+");\r
+ if (subs != null) {\r
+ Loadable[] rv = new Loadable[subs.length];\r
+ for (int i = 0; i < subs.length; i++) {\r
+ // create a new record for each individual sub\r
+ pp[4] = subs[i];\r
+ rv[i] = new DeliveryRecord(pp);\r
+ }\r
+ return rv;\r
+ }\r
+ }\r
+ if (rtype.equals("EXP") && pp.length == 11) {\r
+ // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts\r
+ ExpiryRecord e = new ExpiryRecord(pp);\r
+ if (e.getReason().equals("other"))\r
+ logger.info("Invalid reason '"+pp[9]+"' changed to 'other' for record: "+e.getPublishId());\r
+ return new Loadable[] { e };\r
+ }\r
+ if (rtype.equals("PBF") && pp.length == 12) {\r
+ // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error\r
+ return new Loadable[] { new PubFailRecord(pp) };\r
+ }\r
+ if (rtype.equals("DLX") && pp.length == 7) {\r
+ // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent\r
+ return new Loadable[] { new DeliveryExtraRecord(pp) };\r
+ }\r
+ if (rtype.equals("LOG") && (pp.length == 19 || pp.length == 20)) {\r
+ // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id\r
+ return new Loadable[] { new LogRecord(pp) };\r
+ }\r
+ }\r
+ logger.warn("PROV8002 bad record: "+line);\r
+ return new Loadable[0];\r
+ }\r
+\r
+ /**\r
+ * The LogfileLoader can be run stand-alone by invoking the main() method of this class.\r
+ * @param a ignored\r
+ * @throws InterruptedException\r
+ */\r
+ public static void main(String[] a) throws InterruptedException {\r
+ LogfileLoader.getLoader();\r
+ Thread.sleep(200000L);\r
+ }\r
+}\r