1 /*******************************************************************************
2 * ============LICENSE_START==================================================
4 * * ===========================================================================
5 * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * * ===========================================================================
7 * * Licensed under the Apache License, Version 2.0 (the "License");
8 * * you may not use this file except in compliance with the License.
9 * * You may obtain a copy of the License at
11 * * http://www.apache.org/licenses/LICENSE-2.0
13 * * Unless required by applicable law or agreed to in writing, software
14 * * distributed under the License is distributed on an "AS IS" BASIS,
15 * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * * See the License for the specific language governing permissions and
17 * * limitations under the License.
18 * * ============LICENSE_END====================================================
20 * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 ******************************************************************************/
25 package org.onap.dmaap.datarouter.provisioning.utils;
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
30 import java.io.FileInputStream;
31 import java.io.FileReader;
32 import java.io.IOException;
33 import java.io.InputStreamReader;
34 import java.io.LineNumberReader;
35 import java.io.Reader;
36 import java.nio.file.Files;
37 import java.sql.Connection;
38 import java.sql.PreparedStatement;
39 import java.sql.ResultSet;
40 import java.sql.SQLException;
41 import java.sql.Statement;
42 import java.text.ParseException;
43 import java.util.Date;
44 import java.util.HashMap;
45 import java.util.Iterator;
47 import java.util.TreeSet;
48 import java.util.zip.GZIPInputStream;
49 import org.onap.dmaap.datarouter.provisioning.BaseServlet;
50 import org.onap.dmaap.datarouter.provisioning.ProvRunner;
51 import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord;
52 import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord;
53 import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord;
54 import org.onap.dmaap.datarouter.provisioning.beans.Loadable;
55 import org.onap.dmaap.datarouter.provisioning.beans.LogRecord;
56 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
57 import org.onap.dmaap.datarouter.provisioning.beans.PubFailRecord;
58 import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord;
61 * This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir.
62 * These logfiles are loaded into the MariaDB LOG_RECORDS table. In a running provisioning server, there should only be
63 * two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at
64 * startup to load the old (1.0) style log tables into LOG_RECORDS;
65 * <p>This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the
67 * This bit set is used to synchronize between provisioning servers.</p>
70 * @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $
72 public class LogfileLoader extends Thread {
74 * NOT USED: Percentage of free space required before old records are removed.
76 public static final int REQUIRED_FREE_PCT = 20;
79 * This is a singleton -- there is only one LogfileLoader object in the server.
81 private static LogfileLoader logfileLoader;
84 * Each server can assign this many IDs.
86 private static final long SET_SIZE = (1L << 56);
88 private final EELFLogger logger;
89 private final String spooldir;
90 private final long setStart;
91 private final long setEnd;
92 private RLEBitSet seqSet;
96 private LogfileLoader() {
97 this.logger = EELFManager.getInstance().getLogger("InternalLog");
98 this.spooldir = ProvRunner.getProvProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
99 this.setStart = getIdRange();
100 this.setEnd = setStart + SET_SIZE - 1;
101 this.seqSet = new RLEBitSet();
104 this.setDaemon(true);
105 this.setName("LogfileLoader");
109 * Get the singleton LogfileLoader object, and start it if it is not running.
111 * @return the LogfileLoader
113 public static synchronized LogfileLoader getLoader() {
114 if (logfileLoader == null) {
115 logfileLoader = new LogfileLoader();
117 if (!logfileLoader.isAlive()) {
118 logfileLoader.start();
120 return logfileLoader;
123 private long getIdRange() {
125 if (BaseServlet.isInitialActivePOD()) {
127 } else if (BaseServlet.isInitialStandbyPOD()) {
132 String range = String.format("[%X .. %X]", size, size + SET_SIZE - 1);
133 logger.debug("This server shall assign RECORD_IDs in the range " + range);
138 * Return the bit set representing the record ID's that are loaded in this database.
140 * @return the bit set
142 public RLEBitSet getBitSet() {
147 * True if the LogfileLoader is currently waiting for work.
149 * @return true if idle
151 public boolean isIdle() {
156 * Run continuously to look for new logfiles in the spool directory and import them into the DB.
157 * The spool is checked once per second. If free space on the MariaDB filesystem falls below
158 * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS
159 * table is compacted until free space rises above the threshold.
166 File dirfile = new File(spooldir);
168 runLogFileLoad(dirfile);
170 } catch (Exception e) {
171 logger.warn("PROV0020: Caught exception in LogfileLoader: " + e);
176 private void runLogFileLoad(File filesDir) {
177 File[] inFiles = filesDir.listFiles((dir, name) -> name.startsWith("IN."));
178 if (inFiles != null) {
179 if (inFiles.length == 0) {
183 } catch (InterruptedException e) {
184 Thread.currentThread().interrupt();
189 if (pruneRecords()) {
190 // Removed at least some entries, recompute the bit map
193 for (File file : inFiles) {
200 private void processFile(File infile) {
201 if (logger.isDebugEnabled()) {
202 logger.debug("PROV8001 Starting " + infile + " ...");
204 long time = System.currentTimeMillis();
205 int[] array = process(infile);
206 time = System.currentTimeMillis() - time;
207 logger.info(String.format("PROV8000 Processed %s in %d ms; %d of %d records.",
208 infile.toString(), time, array[0], array[1]));
210 Files.delete(infile.toPath());
211 } catch (IOException e) {
212 logger.info("PROV8001 failed to delete file " + infile.getName(), e);
216 boolean pruneRecords() {
217 boolean did1 = false;
218 long count = countRecords();
219 Parameters defaultLogRetention = Parameters.getParameter(Parameters.DEFAULT_LOG_RETENTION);
220 long threshold = (defaultLogRetention != null) ? Long.parseLong(defaultLogRetention.getValue()) : 1000000L;
221 Parameters provLogRetention = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);
222 if (provLogRetention != null) {
224 long retention = Long.parseLong(provLogRetention.getValue());
225 // This check is to prevent inadvertent errors from wiping the table out
226 if (retention > 1000000L) {
227 threshold = retention;
229 } catch (NumberFormatException e) {
233 logger.debug("Pruning LOG_RECORD table: records in DB=" + count + ", threshold=" + threshold);
234 if (count > threshold) {
235 // we need to remove this many records
237 // histogram of records per day
238 Map<Long, Long> hist = getHistogram();
239 // Determine the cutoff point to remove the needed number of records
242 for (Long day : new TreeSet<>(hist.keySet())) {
243 sum += hist.get(day);
252 logger.debug(" Pruning records older than=" + (cutoff / 86400000L) + " (" + new Date(cutoff) + ")");
254 try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
255 // Limit to a million at a time to avoid typing up the DB for too long.
256 try (PreparedStatement ps = conn.prepareStatement(
257 "DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000")) {
258 ps.setLong(1, cutoff);
261 int dcount = ps.getUpdateCount();
263 logger.debug(" " + dcount + " rows deleted.");
264 did1 |= (dcount != 0);
266 count = 0; // prevent inf. loops
269 count = 0; // shouldn't happen!
273 try (Statement stmt = conn.createStatement()) {
274 stmt.execute("OPTIMIZE TABLE LOG_RECORDS");
276 } catch (SQLException e) {
277 logger.error("LogfileLoader.pruneRecords: " + e.getMessage(), e);
283 private long countRecords() {
285 try (Connection conn = ProvDbUtils.getInstance().getConnection();
286 PreparedStatement ps = conn.prepareStatement("SELECT COUNT(*) as COUNT from LOG_RECORDS");
287 ResultSet rs = ps.executeQuery()) {
289 count = rs.getLong("COUNT");
291 } catch (SQLException e) {
292 logger.error("LogfileLoader.countRecords: " + e.getMessage(), e);
297 private Map<Long, Long> getHistogram() {
298 Map<Long, Long> map = new HashMap<>();
299 try (Connection conn = ProvDbUtils.getInstance().getConnection();
300 PreparedStatement ps = conn.prepareStatement(
301 "SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY");
302 ResultSet rs = ps.executeQuery()) {
303 logger.debug(" LOG_RECORD table histogram...");
305 long day = rs.getLong("DAY");
306 long cnt = rs.getLong("COUNT");
308 logger.debug(" " + day + " " + cnt);
310 } catch (SQLException e) {
311 logger.error("LogfileLoader.getHistogram: " + e.getMessage(), e);
316 private void initializeNextid() {
317 try (Connection conn = ProvDbUtils.getInstance().getConnection()) {
318 RLEBitSet nbs = new RLEBitSet();
319 try (Statement stmt = conn.createStatement()) {
320 // Build a bitset of all records in the LOG_RECORDS table
321 // We need to run this SELECT in stages, because otherwise we run out of memory!
322 final long stepsize = 6000000L;
323 boolean goAgain = true;
324 for (long i = 0; goAgain; i += stepsize) {
325 String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize);
326 try (ResultSet rs = stmt.executeQuery(sql)) {
329 long recordId = rs.getLong("RECORD_ID");
337 // Compare with the range for this server
338 // Determine the next ID for this set of record IDs
339 RLEBitSet tbs = (RLEBitSet) nbs.clone();
340 RLEBitSet idset = new RLEBitSet();
341 idset.set(setStart, setStart + SET_SIZE);
343 long bitLength = tbs.length();
344 nextId = (bitLength == 0) ? setStart : (bitLength - 1);
345 if (nextId >= setStart + SET_SIZE) {
346 // Handle wraparound, when the IDs reach the end of our "range"
348 Iterator<Long[]> li = tbs.getRangeIterator();
349 while (li.hasNext()) {
353 tbs.clear(last[0], last[1] + 1);
354 bitLength = tbs.length();
355 nextId = (bitLength == 0) ? setStart : (bitLength - 1);
358 logger.debug(String.format("LogfileLoader.initializeNextid, next ID is %d (%x)", nextId, nextId));
359 } catch (SQLException e) {
360 logger.error("LogfileLoader.initializeNextid: " + e.getMessage(), e);
364 @SuppressWarnings("resource")
365 int[] process(File file) {
368 try (Connection conn = ProvDbUtils.getInstance().getConnection();
369 PreparedStatement ps = conn.prepareStatement(
370 "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) {
371 Reader reader = file.getPath().endsWith(".gz")
372 ? new InputStreamReader(new GZIPInputStream(new FileInputStream(file)))
373 : new FileReader(file);
374 try (LineNumberReader in = new LineNumberReader(reader)) {
376 while ((line = in.readLine()) != null) {
378 for (Loadable rec : buildRecords(line)) {
380 if (rec instanceof LogRecord) {
381 LogRecord lr = ((LogRecord) rec);
382 if (!seqSet.get(lr.getRecordId())) {
384 seqSet.set(lr.getRecordId());
386 logger.debug("Duplicate record ignored: " + lr.getRecordId());
389 if (++nextId > setEnd) {
392 ps.setLong(18, nextId);
396 ps.clearParameters();
399 } catch (SQLException e) {
400 logger.warn("PROV8003 Invalid value in record: " + line, e);
401 } catch (NumberFormatException e) {
402 logger.warn("PROV8004 Invalid number in record: " + line, e);
403 } catch (ParseException e) {
404 logger.warn("PROV8005 Invalid date in record: " + line, e);
405 } catch (Exception e) {
406 logger.warn("PROV8006 Invalid pattern in record: " + line, e);
411 } catch (SQLException | IOException e) {
412 logger.warn("PROV8007 Exception reading " + file + ": " + e);
414 return new int[]{ok, total};
417 Loadable[] buildRecords(String line) throws ParseException {
418 String[] pp = line.split("\\|");
419 if (pp != null && pp.length >= 7) {
420 String rtype = pp[1].toUpperCase();
421 if ("PUB".equals(rtype) && pp.length == 11) {
422 // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status
423 return new Loadable[]{new PublishRecord(pp)};
425 if ("DEL".equals(rtype) && pp.length == 12) {
426 // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid
427 String[] subs = pp[4].split("\\s+");
429 Loadable[] rv = new Loadable[subs.length];
430 for (int i = 0; i < subs.length; i++) {
431 // create a new record for each individual sub
433 rv[i] = new DeliveryRecord(pp);
438 if ("EXP".equals(rtype) && pp.length == 11) {
439 // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts
440 ExpiryRecord expiryRecord = new ExpiryRecord(pp);
441 if ("other".equals(expiryRecord.getReason())) {
442 logger.info("Invalid reason '" + pp[9] + "' changed to 'other' for record: "
443 + expiryRecord.getPublishId());
445 return new Loadable[]{expiryRecord};
447 if ("PBF".equals(rtype) && pp.length == 12) {
448 // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error
449 return new Loadable[]{new PubFailRecord(pp)};
451 if ("DLX".equals(rtype) && pp.length == 7) {
452 // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent
453 return new Loadable[]{new DeliveryExtraRecord(pp)};
455 if ("LOG".equals(rtype) && (pp.length == 19 || pp.length == 20)) {
456 // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|
457 // feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id
458 return new Loadable[]{new LogRecord(pp)};
461 logger.warn("PROV8002 bad record: " + line);
462 return new Loadable[0];
466 * The LogfileLoader can be run stand-alone by invoking the main() method of this class.
470 public static void main(String[] str) throws InterruptedException {
471 LogfileLoader.getLoader();
472 Thread.sleep(200000L);