/******************************************************************************* * ============LICENSE_START================================================== * * org.onap.dmaap * * =========================================================================== * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * * =========================================================================== * * Licensed under the Apache License, Version 2.0 (the "License"); * * you may not use this file except in compliance with the License. * * You may obtain a copy of the License at * * * * http://www.apache.org/licenses/LICENSE-2.0 * * * * Unless required by applicable law or agreed to in writing, software * * distributed under the License is distributed on an "AS IS" BASIS, * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * * See the License for the specific language governing permissions and * * limitations under the License. * * ============LICENSE_END==================================================== * * * * ECOMP is a trademark and service mark of AT&T Intellectual Property. * * ******************************************************************************/ package org.onap.dmaap.datarouter.provisioning.utils; import com.att.eelf.configuration.EELFLogger; import com.att.eelf.configuration.EELFManager; import java.io.File; import java.io.FileInputStream; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.LineNumberReader; import java.io.Reader; import java.nio.file.Files; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.text.ParseException; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.TreeSet; import java.util.zip.GZIPInputStream; import org.onap.dmaap.datarouter.provisioning.BaseServlet; import org.onap.dmaap.datarouter.provisioning.ProvRunner; import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord; import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord; import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord; import org.onap.dmaap.datarouter.provisioning.beans.Loadable; import org.onap.dmaap.datarouter.provisioning.beans.LogRecord; import org.onap.dmaap.datarouter.provisioning.beans.Parameters; import org.onap.dmaap.datarouter.provisioning.beans.PubFailRecord; import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord; /** * This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir. * These logfiles are loaded into the MariaDB LOG_RECORDS table. In a running provisioning server, there should only be * two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at * startup to load the old (1.0) style log tables into LOG_RECORDS; *
This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the * database. * This bit set is used to synchronize between provisioning servers.
* * @author Robert Eby * @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $ */ public class LogfileLoader extends Thread { /** * NOT USED: Percentage of free space required before old records are removed. */ public static final int REQUIRED_FREE_PCT = 20; /** * This is a singleton -- there is only one LogfileLoader object in the server. */ private static LogfileLoader logfileLoader; /** * Each server can assign this many IDs. */ private static final long SET_SIZE = (1L << 56); private final EELFLogger logger; private final String spooldir; private final long setStart; private final long setEnd; private RLEBitSet seqSet; private long nextId; private boolean idle; private LogfileLoader() { this.logger = EELFManager.getInstance().getLogger("InternalLog"); this.spooldir = ProvRunner.getProvProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir"); this.setStart = getIdRange(); this.setEnd = setStart + SET_SIZE - 1; this.seqSet = new RLEBitSet(); this.nextId = 0; this.idle = false; this.setDaemon(true); this.setName("LogfileLoader"); } /** * Get the singleton LogfileLoader object, and start it if it is not running. * * @return the LogfileLoader */ public static synchronized LogfileLoader getLoader() { if (logfileLoader == null) { logfileLoader = new LogfileLoader(); } if (!logfileLoader.isAlive()) { logfileLoader.start(); } return logfileLoader; } private long getIdRange() { long size; if (BaseServlet.isInitialActivePOD()) { size = 0; } else if (BaseServlet.isInitialStandbyPOD()) { size = SET_SIZE; } else { size = SET_SIZE * 2; } String range = String.format("[%X .. %X]", size, size + SET_SIZE - 1); logger.debug("This server shall assign RECORD_IDs in the range " + range); return size; } /** * Return the bit set representing the record ID's that are loaded in this database. * * @return the bit set */ public RLEBitSet getBitSet() { return seqSet; } /** * True if the LogfileLoader is currently waiting for work. * * @return true if idle */ public boolean isIdle() { return idle; } /** * Run continuously to look for new logfiles in the spool directory and import them into the DB. * The spool is checked once per second. If free space on the MariaDB filesystem falls below * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS * table is compacted until free space rises above the threshold. */ @Override public void run() { initializeNextid(); while (true) { try { File dirfile = new File(spooldir); while (true) { runLogFileLoad(dirfile); } } catch (Exception e) { logger.warn("PROV0020: Caught exception in LogfileLoader: " + e); } } } private void runLogFileLoad(File filesDir) { File[] inFiles = filesDir.listFiles((dir, name) -> name.startsWith("IN.")); if (inFiles != null) { if (inFiles.length == 0) { idle = true; try { Thread.sleep(1000L); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } idle = false; } else { // Remove old rows if (pruneRecords()) { // Removed at least some entries, recompute the bit map initializeNextid(); } for (File file : inFiles) { processFile(file); } } } } private void processFile(File infile) { if (logger.isDebugEnabled()) { logger.debug("PROV8001 Starting " + infile + " ..."); } long time = System.currentTimeMillis(); int[] array = process(infile); time = System.currentTimeMillis() - time; logger.info(String.format("PROV8000 Processed %s in %d ms; %d of %d records.", infile.toString(), time, array[0], array[1])); try { Files.delete(infile.toPath()); } catch (IOException e) { logger.info("PROV8001 failed to delete file " + infile.getName(), e); } } boolean pruneRecords() { boolean did1 = false; long count = countRecords(); Parameters defaultLogRetention = Parameters.getParameter(Parameters.DEFAULT_LOG_RETENTION); long threshold = (defaultLogRetention != null) ? Long.parseLong(defaultLogRetention.getValue()) : 1000000L; Parameters provLogRetention = Parameters.getParameter(Parameters.PROV_LOG_RETENTION); if (provLogRetention != null) { try { long retention = Long.parseLong(provLogRetention.getValue()); // This check is to prevent inadvertent errors from wiping the table out if (retention > 1000000L) { threshold = retention; } } catch (NumberFormatException e) { // ignore } } logger.debug("Pruning LOG_RECORD table: records in DB=" + count + ", threshold=" + threshold); if (count > threshold) { // we need to remove this many records count -= threshold; // histogram of records per day Map