1 /*******************************************************************************
\r
2 * ============LICENSE_START==================================================
\r
4 * * ===========================================================================
\r
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
\r
6 * * ===========================================================================
\r
7 * * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * * you may not use this file except in compliance with the License.
\r
9 * * You may obtain a copy of the License at
\r
11 * * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * * Unless required by applicable law or agreed to in writing, software
\r
14 * * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * * See the License for the specific language governing permissions and
\r
17 * * limitations under the License.
\r
18 * * ============LICENSE_END====================================================
\r
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
\r
22 ******************************************************************************/
\r
25 package org.onap.dmaap.datarouter.provisioning.utils;
\r
27 import java.io.File;
\r
28 import java.io.FileInputStream;
\r
29 import java.io.FileNotFoundException;
\r
30 import java.io.FileReader;
\r
31 import java.io.FilenameFilter;
\r
32 import java.io.IOException;
\r
33 import java.io.InputStreamReader;
\r
34 import java.io.LineNumberReader;
\r
35 import java.io.Reader;
\r
36 import java.sql.Connection;
\r
37 import java.sql.PreparedStatement;
\r
38 import java.sql.ResultSet;
\r
39 import java.sql.SQLException;
\r
40 import java.sql.Statement;
\r
41 import java.text.ParseException;
\r
42 import java.util.Date;
\r
43 import java.util.HashMap;
\r
44 import java.util.Iterator;
\r
45 import java.util.Map;
\r
46 import java.util.TreeSet;
\r
47 import java.util.zip.GZIPInputStream;
\r
49 import org.apache.log4j.Logger;
\r
50 import org.onap.dmaap.datarouter.provisioning.BaseServlet;
\r
51 import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord;
\r
52 import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord;
\r
53 import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord;
\r
54 import org.onap.dmaap.datarouter.provisioning.beans.Loadable;
\r
55 import org.onap.dmaap.datarouter.provisioning.beans.LogRecord;
\r
56 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
\r
57 import org.onap.dmaap.datarouter.provisioning.beans.PubFailRecord;
\r
58 import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord;
\r
61 * This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir.
\r
62 * These logfiles are loaded into the MySQL LOG_RECORDS table. In a running provisioning server, there should only be
\r
63 * two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at
\r
64 * startup to load the old (1.0) style log tables into LOG_RECORDS;
\r
65 * <p>This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the
\r
67 * This bit set is used to synchronize between provisioning servers.</p>
\r
69 * @author Robert Eby
\r
70 * @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $
\r
72 public class LogfileLoader extends Thread {
\r
73 /** Default number of log records to keep when pruning. Keep 10M by default. */
\r
74 public static final long DEFAULT_LOG_RETENTION = 10000000L;
\r
75 /** NOT USED: Percentage of free space required before old records are removed. */
\r
76 public static final int REQUIRED_FREE_PCT = 20;
\r
78 /** This is a singleton -- there is only one LogfileLoader object in the server */
\r
79 private static LogfileLoader p;
\r
82 * Get the singleton LogfileLoader object, and start it if it is not running.
\r
83 * @return the LogfileLoader
\r
85 public static synchronized LogfileLoader getLoader() {
\r
87 p = new LogfileLoader();
\r
93 /** The PreparedStatement which is loaded by a <i>Loadable</i>. */
\r
94 public static final String INSERT_SQL = "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
\r
95 /** Each server can assign this many IDs */
\r
96 private static final long SET_SIZE = (1L << 56);
\r
98 private final Logger logger;
\r
99 private final DB db;
\r
100 private final String spooldir;
\r
101 private final long set_start;
\r
102 private final long set_end;
\r
103 private RLEBitSet seq_set;
\r
104 private long nextid;
\r
105 private boolean idle;
\r
107 private LogfileLoader() {
\r
108 this.logger = Logger.getLogger("org.onap.dmaap.datarouter.provisioning.internal");
\r
109 this.db = new DB();
\r
110 this.spooldir = db.getProperties().getProperty("com.att.research.datarouter.provserver.spooldir");
\r
111 this.set_start = getIdRange();
\r
112 this.set_end = set_start + SET_SIZE - 1;
\r
113 this.seq_set = new RLEBitSet();
\r
117 // This is a potentially lengthy operation, so has been moved to run()
\r
118 //initializeNextid();
\r
119 this.setDaemon(true);
\r
120 this.setName("LogfileLoader");
\r
123 private long getIdRange() {
\r
125 if (BaseServlet.isInitialActivePOD())
\r
127 else if (BaseServlet.isInitialStandbyPOD())
\r
131 String r = String.format("[%X .. %X]", n, n+SET_SIZE-1);
\r
132 logger.debug("This server shall assign RECORD_IDs in the range "+r);
\r
136 * Return the bit set representing the record ID's that are loaded in this database.
\r
137 * @return the bit set
\r
139 public RLEBitSet getBitSet() {
\r
143 * True if the LogfileLoader is currently waiting for work.
\r
144 * @return true if idle
\r
146 public boolean isIdle() {
\r
150 * Run continuously to look for new logfiles in the spool directory and import them into the DB.
\r
151 * The spool is checked once per second. If free space on the MySQL filesystem falls below
\r
152 * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS
\r
153 * table is compacted until free space rises above the threshold.
\r
156 public void run() {
\r
157 initializeNextid(); // moved from the constructor
\r
160 File dirfile = new File(spooldir);
\r
162 // process IN files
\r
163 File[] infiles = dirfile.listFiles(new FilenameFilter() {
\r
165 public boolean accept(File dir, String name) {
\r
166 return name.startsWith("IN.");
\r
170 if (infiles.length == 0) {
\r
173 Thread.sleep(1000L);
\r
174 } catch (InterruptedException e) {
\r
179 if (pruneRecords()) {
\r
180 // Removed at least some entries, recompute the bit map
\r
181 initializeNextid();
\r
184 // Process incoming logfiles
\r
185 for (File f : infiles) {
\r
186 if (logger.isDebugEnabled())
\r
187 logger.debug("PROV8001 Starting " + f + " ...");
\r
188 long time = System.currentTimeMillis();
\r
189 int[] n = process(f);
\r
190 time = System.currentTimeMillis() - time;
\r
192 .format("PROV8000 Processed %s in %d ms; %d of %d records.",
\r
193 f.toString(), time, n[0], n[1]));
\r
198 } catch (Exception e) {
\r
199 logger.warn("PROV0020: Caught exception in LogfileLoader: " + e);
\r
200 e.printStackTrace();
\r
204 private boolean pruneRecords() {
\r
205 boolean did1 = false;
\r
206 long count = countRecords();
\r
207 long threshold = DEFAULT_LOG_RETENTION;
\r
208 Parameters param = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);
\r
209 if (param != null) {
\r
211 long n = Long.parseLong(param.getValue());
\r
212 // This check is to prevent inadvertent errors from wiping the table out
\r
215 } catch (NumberFormatException e) {
\r
219 logger.debug("Pruning LOG_RECORD table: records in DB="+count+", threshold="+threshold);
\r
220 if (count > threshold) {
\r
221 count -= threshold; // we need to remove this many records;
\r
222 Map<Long,Long> hist = getHistogram(); // histogram of records per day
\r
223 // Determine the cutoff point to remove the needed number of records
\r
226 for (Long day : new TreeSet<Long>(hist.keySet())) {
\r
227 sum += hist.get(day);
\r
233 cutoff *= 86400000L; // convert day to ms
\r
234 logger.debug(" Pruning records older than="+(cutoff/86400000L)+" ("+new Date(cutoff)+")");
\r
236 Connection conn = null;
\r
238 // Limit to a million at a time to avoid typing up the DB for too long.
\r
239 conn = db.getConnection();
\r
240 PreparedStatement ps = conn.prepareStatement("DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000");
\r
241 ps.setLong(1, cutoff);
\r
242 while (count > 0) {
\r
243 if (!ps.execute()) {
\r
244 int dcount = ps.getUpdateCount();
\r
246 logger.debug(" "+dcount+" rows deleted.");
\r
247 did1 |= (dcount!=0);
\r
249 count = 0; // prevent inf. loops
\r
251 count = 0; // shouldn't happen!
\r
255 Statement stmt = conn.createStatement();
\r
256 stmt.execute("OPTIMIZE TABLE LOG_RECORDS");
\r
258 } catch (SQLException e) {
\r
259 System.err.println(e);
\r
260 e.printStackTrace();
\r
267 private long countRecords() {
\r
269 Connection conn = null;
\r
271 conn = db.getConnection();
\r
272 Statement stmt = conn.createStatement();
\r
273 ResultSet rs = stmt.executeQuery("SELECT COUNT(*) as COUNT from LOG_RECORDS");
\r
275 count = rs.getLong("COUNT");
\r
279 } catch (SQLException e) {
\r
280 System.err.println(e);
\r
281 e.printStackTrace();
\r
287 private Map<Long,Long> getHistogram() {
\r
288 Map<Long,Long> map = new HashMap<Long,Long>();
\r
289 Connection conn = null;
\r
291 logger.debug(" LOG_RECORD table histogram...");
\r
292 conn = db.getConnection();
\r
293 Statement stmt = conn.createStatement();
\r
294 ResultSet rs = stmt.executeQuery("SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY");
\r
295 while (rs.next()) {
\r
296 long day = rs.getLong("DAY");
\r
297 long cnt = rs.getLong("COUNT");
\r
299 logger.debug(" "+day + " "+cnt);
\r
303 } catch (SQLException e) {
\r
304 System.err.println(e);
\r
305 e.printStackTrace();
\r
311 private void initializeNextid() {
\r
312 Connection conn = null;
\r
314 conn = db.getConnection();
\r
315 Statement stmt = conn.createStatement();
\r
316 // Build a bitset of all records in the LOG_RECORDS table
\r
317 // We need to run this SELECT in stages, because otherwise we run out of memory!
\r
318 RLEBitSet nbs = new RLEBitSet();
\r
319 final long stepsize = 6000000L;
\r
320 boolean go_again = true;
\r
321 for (long i = 0; go_again; i += stepsize) {
\r
322 String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize);
\r
323 ResultSet rs = stmt.executeQuery(sql);
\r
325 while (rs.next()) {
\r
326 long n = rs.getLong("RECORD_ID");
\r
335 // Compare with the range for this server
\r
336 // Determine the next ID for this set of record IDs
\r
337 RLEBitSet tbs = (RLEBitSet) nbs.clone();
\r
338 RLEBitSet idset = new RLEBitSet();
\r
339 idset.set(set_start, set_start+SET_SIZE);
\r
341 long t = tbs.length();
\r
342 nextid = (t == 0) ? set_start : (t - 1);
\r
343 if (nextid >= set_start+SET_SIZE) {
\r
344 // Handle wraparound, when the IDs reach the end of our "range"
\r
345 Long[] last = null;
\r
346 Iterator<Long[]> li = tbs.getRangeIterator();
\r
347 while (li.hasNext()) {
\r
350 if (last != null) {
\r
351 tbs.clear(last[0], last[1]+1);
\r
353 nextid = (t == 0) ? set_start : (t - 1);
\r
356 logger.debug(String.format("initializeNextid, next ID is %d (%x)", nextid, nextid));
\r
357 } catch (SQLException e) {
\r
358 System.err.println(e);
\r
359 e.printStackTrace();
\r
364 // OLD CODE - commented here for historical purposes
\r
366 // private boolean pruneRecordsOldAlgorithm() {
\r
367 // // Determine space available -- available space must be at least 20% under /opt/app/mysql
\r
368 // int pct = getFreePercentage();
\r
369 // boolean did1 = false;
\r
370 // while (pct < REQUIRED_FREE_PCT) {
\r
371 // logger.info("PROV8008: Free space is " + pct + "% - removing old log entries");
\r
372 // boolean didit = removeOldestEntries();
\r
373 // pct = didit ? getFreePercentage() : 100; // don't loop endlessly
\r
378 // private int getFreePercentage() {
\r
379 // FileSystem fs = (Paths.get("/opt/app/mysql")).getFileSystem();
\r
383 // for (FileStore store : fs.getFileStores()) {
\r
384 // total += store.getTotalSpace();
\r
385 // avail += store.getUsableSpace();
\r
387 // } catch (IOException e) {
\r
389 // try { fs.close(); } catch (Exception e) { }
\r
390 // return (int)((avail * 100) / total);
\r
392 // private boolean removeOldestEntries() {
\r
393 // // Remove the last days worth of entries
\r
394 // Connection conn = null;
\r
396 // conn = db.getConnection();
\r
397 // Statement stmt = conn.createStatement();
\r
398 // ResultSet rs = stmt.executeQuery("select min(event_time) as MIN from LOG_RECORDS");
\r
399 // if (rs != null) {
\r
400 // if (rs.next()) {
\r
401 // // Compute the end of the first day of logs
\r
402 // long first = rs.getLong("MIN");
\r
403 // Calendar cal = new GregorianCalendar();
\r
404 // cal.setTime(new Date(first));
\r
405 // cal.add(Calendar.DAY_OF_YEAR, 1);
\r
406 // cal.set(Calendar.HOUR_OF_DAY, 0);
\r
407 // cal.set(Calendar.MINUTE, 0);
\r
408 // cal.set(Calendar.SECOND, 0);
\r
409 // cal.set(Calendar.MILLISECOND, 0);
\r
410 // if (!stmt.execute("delete from LOG_RECORDS where event_time < " + cal.getTimeInMillis())) {
\r
411 // int count = stmt.getUpdateCount();
\r
412 // logger.info("PROV0009: Removed "+count+" old log entries.");
\r
413 // stmt.execute("OPTIMIZE TABLE LOG_RECORDS");
\r
422 // } catch (SQLException e) {
\r
423 // System.err.println(e);
\r
424 // e.printStackTrace();
\r
426 // db.release(conn);
\r
430 @SuppressWarnings("resource")
\r
431 private int[] process(File f) {
\r
432 int ok = 0, total = 0;
\r
434 Connection conn = db.getConnection();
\r
435 PreparedStatement ps = conn.prepareStatement(INSERT_SQL);
\r
436 Reader r = f.getPath().endsWith(".gz")
\r
437 ? new InputStreamReader(new GZIPInputStream(new FileInputStream(f)))
\r
438 : new FileReader(f);
\r
439 LineNumberReader in = new LineNumberReader(r);
\r
441 while ((line = in.readLine()) != null) {
\r
443 for (Loadable rec : buildRecords(line)) {
\r
445 if (rec instanceof LogRecord) {
\r
446 LogRecord lr = ((LogRecord)rec);
\r
447 if (!seq_set.get(lr.getRecordId())) {
\r
448 ps.executeUpdate();
\r
449 seq_set.set(lr.getRecordId());
\r
451 logger.debug("Duplicate record ignored: "+lr.getRecordId());
\r
453 if (++nextid > set_end)
\r
454 nextid = set_start;
\r
455 ps.setLong(18, nextid);
\r
456 ps.executeUpdate();
\r
457 seq_set.set(nextid);
\r
459 ps.clearParameters();
\r
462 } catch (SQLException e) {
\r
463 logger.warn("PROV8003 Invalid value in record: "+line);
\r
465 e.printStackTrace();
\r
466 } catch (NumberFormatException e) {
\r
467 logger.warn("PROV8004 Invalid number in record: "+line);
\r
469 e.printStackTrace();
\r
470 } catch (ParseException e) {
\r
471 logger.warn("PROV8005 Invalid date in record: "+line);
\r
473 e.printStackTrace();
\r
474 } catch (Exception e) {
\r
475 logger.warn("PROV8006 Invalid pattern in record: "+line);
\r
477 e.printStackTrace();
\r
485 } catch (FileNotFoundException e) {
\r
486 logger.warn("PROV8007 Exception reading "+f+": "+e);
\r
487 } catch (IOException e) {
\r
488 logger.warn("PROV8007 Exception reading "+f+": "+e);
\r
489 } catch (SQLException e) {
\r
490 logger.warn("PROV8007 Exception reading "+f+": "+e);
\r
492 return new int[] { ok, total };
\r
494 private Loadable[] buildRecords(String line) throws ParseException {
\r
495 String[] pp = line.split("\\|");
\r
496 if (pp != null && pp.length >= 7) {
\r
497 String rtype = pp[1].toUpperCase();
\r
498 if (rtype.equals("PUB") && pp.length == 11) {
\r
499 // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status
\r
500 return new Loadable[] { new PublishRecord(pp) };
\r
502 if (rtype.equals("DEL") && pp.length == 12) {
\r
503 // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid
\r
504 String[] subs = pp[4].split("\\s+");
\r
505 if (subs != null) {
\r
506 Loadable[] rv = new Loadable[subs.length];
\r
507 for (int i = 0; i < subs.length; i++) {
\r
508 // create a new record for each individual sub
\r
510 rv[i] = new DeliveryRecord(pp);
\r
515 if (rtype.equals("EXP") && pp.length == 11) {
\r
516 // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts
\r
517 ExpiryRecord e = new ExpiryRecord(pp);
\r
518 if (e.getReason().equals("other"))
\r
519 logger.info("Invalid reason '"+pp[9]+"' changed to 'other' for record: "+e.getPublishId());
\r
520 return new Loadable[] { e };
\r
522 if (rtype.equals("PBF") && pp.length == 12) {
\r
523 // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error
\r
524 return new Loadable[] { new PubFailRecord(pp) };
\r
526 if (rtype.equals("DLX") && pp.length == 7) {
\r
527 // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent
\r
528 return new Loadable[] { new DeliveryExtraRecord(pp) };
\r
530 if (rtype.equals("LOG") && (pp.length == 19 || pp.length == 20)) {
\r
531 // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id
\r
532 return new Loadable[] { new LogRecord(pp) };
\r
535 logger.warn("PROV8002 bad record: "+line);
\r
536 return new Loadable[0];
\r
540 * The LogfileLoader can be run stand-alone by invoking the main() method of this class.
\r
542 * @throws InterruptedException
\r
544 public static void main(String[] a) throws InterruptedException {
\r
545 LogfileLoader.getLoader();
\r
546 Thread.sleep(200000L);
\r