afbadcd257551178d123adf21fd0e88da5a1a35b
[dmaap/datarouter.git] / datarouter-prov / src / main / java / org / onap / dmaap / datarouter / provisioning / utils / LogfileLoader.java
1 /*******************************************************************************
2  * ============LICENSE_START==================================================
3  * * org.onap.dmaap
4  * * ===========================================================================
5  * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * * ===========================================================================
7  * * Licensed under the Apache License, Version 2.0 (the "License");
8  * * you may not use this file except in compliance with the License.
9  * * You may obtain a copy of the License at
10  * *
11  *  *      http://www.apache.org/licenses/LICENSE-2.0
12  * *
13  *  * Unless required by applicable law or agreed to in writing, software
14  * * distributed under the License is distributed on an "AS IS" BASIS,
15  * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * * See the License for the specific language governing permissions and
17  * * limitations under the License.
18  * * ============LICENSE_END====================================================
19  * *
20  * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  * *
22  ******************************************************************************/
23
24
25 package org.onap.dmaap.datarouter.provisioning.utils;
26
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29 import java.io.File;
30 import java.io.FileInputStream;
31 import java.io.FileReader;
32 import java.io.IOException;
33 import java.io.InputStreamReader;
34 import java.io.LineNumberReader;
35 import java.io.Reader;
36 import java.nio.file.Files;
37 import java.sql.Connection;
38 import java.sql.PreparedStatement;
39 import java.sql.ResultSet;
40 import java.sql.SQLException;
41 import java.sql.Statement;
42 import java.text.ParseException;
43 import java.util.Date;
44 import java.util.HashMap;
45 import java.util.Iterator;
46 import java.util.Map;
47 import java.util.TreeSet;
48 import java.util.zip.GZIPInputStream;
49 import org.onap.dmaap.datarouter.provisioning.BaseServlet;
50 import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord;
51 import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord;
52 import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord;
53 import org.onap.dmaap.datarouter.provisioning.beans.Loadable;
54 import org.onap.dmaap.datarouter.provisioning.beans.LogRecord;
55 import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
56 import org.onap.dmaap.datarouter.provisioning.beans.PubFailRecord;
57 import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord;
58
59 /**
60  * This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir.
61  * These logfiles are loaded into the MariaDB LOG_RECORDS table. In a running provisioning server, there should only be
62  * two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at
63  * startup to load the old (1.0) style log tables into LOG_RECORDS;
64  * <p>This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the
65  * database.
66  * This bit set is used to synchronize between provisioning servers.</p>
67  *
68  * @author Robert Eby
69  * @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $
70  */
71 public class LogfileLoader extends Thread {
72     /**
73      * NOT USED: Percentage of free space required before old records are removed.
74      */
75     public static final int REQUIRED_FREE_PCT = 20;
76
77     /**
78      * This is a singleton -- there is only one LogfileLoader object in the server.
79      */
80     private static LogfileLoader logfileLoader;
81
82     /**
83      * The PreparedStatement which is loaded by a <i>Loadable</i>.
84      */
85     private static final String INSERT_SQL = "insert into LOG_RECORDS "
86                                                + "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
87     /**
88      * Each server can assign this many IDs.
89      */
90     private static final long SET_SIZE = (1L << 56);
91
92     private final EELFLogger logger;
93     private final DB db;
94     private final String spooldir;
95     private final long setStart;
96     private final long setEnd;
97     private RLEBitSet seqSet;
98     private long nextId;
99     private boolean idle;
100
101     private LogfileLoader() {
102         this.logger = EELFManager.getInstance().getLogger("InternalLog");
103         this.db = new DB();
104         this.spooldir = db.getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
105         this.setStart = getIdRange();
106         this.setEnd = setStart + SET_SIZE - 1;
107         this.seqSet = new RLEBitSet();
108         this.nextId = 0;
109         this.idle = false;
110         this.setDaemon(true);
111         this.setName("LogfileLoader");
112     }
113
114     /**
115      * Get the singleton LogfileLoader object, and start it if it is not running.
116      *
117      * @return the LogfileLoader
118      */
119     public static synchronized LogfileLoader getLoader() {
120         if (logfileLoader == null) {
121             logfileLoader = new LogfileLoader();
122         }
123         if (!logfileLoader.isAlive()) {
124             logfileLoader.start();
125         }
126         return logfileLoader;
127     }
128
129     private long getIdRange() {
130         long size;
131         if (BaseServlet.isInitialActivePOD()) {
132             size = 0;
133         } else if (BaseServlet.isInitialStandbyPOD()) {
134             size = SET_SIZE;
135         } else {
136             size = SET_SIZE * 2;
137         }
138         String range = String.format("[%X .. %X]", size, size + SET_SIZE - 1);
139         logger.debug("This server shall assign RECORD_IDs in the range " + range);
140         return size;
141     }
142
143     /**
144      * Return the bit set representing the record ID's that are loaded in this database.
145      *
146      * @return the bit set
147      */
148     public RLEBitSet getBitSet() {
149         return seqSet;
150     }
151
152     /**
153      * True if the LogfileLoader is currently waiting for work.
154      *
155      * @return true if idle
156      */
157     public boolean isIdle() {
158         return idle;
159     }
160
161     /**
162      * Run continuously to look for new logfiles in the spool directory and import them into the DB.
163      * The spool is checked once per second.  If free space on the MariaDB filesystem falls below
164      * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS
165      * table is compacted until free space rises above the threshold.
166      */
167     @Override
168     public void run() {
169         initializeNextid();
170         while (true) {
171             try {
172                 File dirfile = new File(spooldir);
173                 while (true) {
174                     runLogFileLoad(dirfile);
175                 }
176             } catch (Exception e) {
177                 logger.warn("PROV0020: Caught exception in LogfileLoader: " + e);
178             }
179         }
180     }
181
182     private void runLogFileLoad(File filesDir) {
183         File[] inFiles = filesDir.listFiles((dir, name) -> name.startsWith("IN."));
184         if (inFiles != null) {
185             if (inFiles.length == 0) {
186                 idle = true;
187                 try {
188                     Thread.sleep(1000L);
189                 } catch (InterruptedException e) {
190                     Thread.currentThread().interrupt();
191                 }
192                 idle = false;
193             } else {
194                 // Remove old rows
195                 if (pruneRecords()) {
196                     // Removed at least some entries, recompute the bit map
197                     initializeNextid();
198                 }
199                 for (File file : inFiles) {
200                     processFile(file);
201                 }
202             }
203         }
204     }
205
206     private void processFile(File infile) {
207         if (logger.isDebugEnabled()) {
208             logger.debug("PROV8001 Starting " + infile + " ...");
209         }
210         long time = System.currentTimeMillis();
211         int[] array = process(infile);
212         time = System.currentTimeMillis() - time;
213         logger.info(String.format("PROV8000 Processed %s in %d ms; %d of %d records.",
214             infile.toString(), time, array[0], array[1]));
215         try {
216             Files.delete(infile.toPath());
217         } catch (IOException e) {
218             logger.info("PROV8001 failed to delete file " + infile.getName(), e);
219         }
220     }
221
222     boolean pruneRecords() {
223         boolean did1 = false;
224         long count = countRecords();
225         Parameters defaultLogRetention = Parameters.getParameter(Parameters.DEFAULT_LOG_RETENTION);
226         long threshold = (defaultLogRetention != null) ? Long.parseLong(defaultLogRetention.getValue()) : 1000000L;
227         Parameters provLogRetention = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);
228         if (provLogRetention != null) {
229             try {
230                 long retention = Long.parseLong(provLogRetention.getValue());
231                 // This check is to prevent inadvertent errors from wiping the table out
232                 if (retention > 1000000L) {
233                     threshold = retention;
234                 }
235             } catch (NumberFormatException e) {
236                 // ignore
237             }
238         }
239         logger.debug("Pruning LOG_RECORD table: records in DB=" + count + ", threshold=" + threshold);
240         if (count > threshold) {
241             // we need to remove this many records
242             count -= threshold;
243             // histogram of records per day
244             Map<Long, Long> hist = getHistogram();
245             // Determine the cutoff point to remove the needed number of records
246             long sum = 0;
247             long cutoff = 0;
248             for (Long day : new TreeSet<>(hist.keySet())) {
249                 sum += hist.get(day);
250                 cutoff = day;
251                 if (sum >= count) {
252                     break;
253                 }
254             }
255             cutoff++;
256             // convert day to ms
257             cutoff *= 86400000L;
258             logger.debug("  Pruning records older than=" + (cutoff / 86400000L) + " (" + new Date(cutoff) + ")");
259
260             Connection conn = null;
261             try {
262                 // Limit to a million at a time to avoid typing up the DB for too long.
263                 conn = db.getConnection();
264                 try (PreparedStatement ps = conn.prepareStatement(
265                         "DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000")) {
266                     ps.setLong(1, cutoff);
267                     while (count > 0) {
268                         if (!ps.execute()) {
269                             int dcount = ps.getUpdateCount();
270                             count -= dcount;
271                             logger.debug("  " + dcount + " rows deleted.");
272                             did1 |= (dcount != 0);
273                             if (dcount == 0) {
274                                 count = 0;    // prevent inf. loops
275                             }
276                         } else {
277                             count = 0;    // shouldn't happen!
278                         }
279                     }
280                 }
281                 try (Statement stmt = conn.createStatement()) {
282                     stmt.execute("OPTIMIZE TABLE LOG_RECORDS");
283                 }
284             } catch (SQLException e) {
285                 logger.error("LogfileLoader.pruneRecords: " + e.getMessage(), e);
286             } finally {
287                 db.release(conn);
288             }
289         }
290         return did1;
291     }
292
293     private long countRecords() {
294         long count = 0;
295         try (Connection conn = db.getConnection();
296             Statement stmt = conn.createStatement()) {
297             try (ResultSet rs = stmt.executeQuery("SELECT COUNT(*) as COUNT from LOG_RECORDS")) {
298                 if (rs.next()) {
299                     count = rs.getLong("COUNT");
300                 }
301             } finally {
302                 db.release(conn);
303             }
304         } catch (SQLException e) {
305             logger.error("LogfileLoader.countRecords: " + e.getMessage(), e);
306         }
307         return count;
308     }
309
310     private Map<Long, Long> getHistogram() {
311         Map<Long, Long> map = new HashMap<>();
312         try (Connection conn = db.getConnection();
313             Statement stmt = conn.createStatement()) {
314             logger.debug("  LOG_RECORD table histogram...");
315             try (ResultSet rs = stmt.executeQuery(
316                     "SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY")) {
317                 while (rs.next()) {
318                     long day = rs.getLong("DAY");
319                     long cnt = rs.getLong("COUNT");
320                     map.put(day, cnt);
321                     logger.debug("  " + day + "  " + cnt);
322                 }
323             } finally {
324                 db.release(conn);
325             }
326         } catch (SQLException e) {
327             logger.error("LogfileLoader.getHistogram: " + e.getMessage(), e);
328         }
329         return map;
330     }
331
332     private void initializeNextid() {
333         Connection conn = null;
334         try {
335             conn = db.getConnection();
336             RLEBitSet nbs = new RLEBitSet();
337             try (Statement stmt = conn.createStatement()) {
338                 // Build a bitset of all records in the LOG_RECORDS table
339                 // We need to run this SELECT in stages, because otherwise we run out of memory!
340                 final long stepsize = 6000000L;
341                 boolean goAgain = true;
342                 for (long i = 0; goAgain; i += stepsize) {
343                     String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize);
344                     try (ResultSet rs = stmt.executeQuery(sql)) {
345                         goAgain = false;
346                         while (rs.next()) {
347                             long recordId = rs.getLong("RECORD_ID");
348                             nbs.set(recordId);
349                             goAgain = true;
350                         }
351                     }
352                 }
353             }
354             seqSet = nbs;
355             // Compare with the range for this server
356             // Determine the next ID for this set of record IDs
357             RLEBitSet tbs = (RLEBitSet) nbs.clone();
358             RLEBitSet idset = new RLEBitSet();
359             idset.set(setStart, setStart + SET_SIZE);
360             tbs.and(idset);
361             long bitLength = tbs.length();
362             nextId = (bitLength == 0) ? setStart : (bitLength - 1);
363             if (nextId >= setStart + SET_SIZE) {
364                 // Handle wraparound, when the IDs reach the end of our "range"
365                 Long[] last = null;
366                 Iterator<Long[]> li = tbs.getRangeIterator();
367                 while (li.hasNext()) {
368                     last = li.next();
369                 }
370                 if (last != null) {
371                     tbs.clear(last[0], last[1] + 1);
372                     bitLength = tbs.length();
373                     nextId = (bitLength == 0) ? setStart : (bitLength - 1);
374                 }
375             }
376             logger.debug(String.format("LogfileLoader.initializeNextid, next ID is %d (%x)", nextId, nextId));
377         } catch (SQLException e) {
378             logger.error("LogfileLoader.initializeNextid: " + e.getMessage(), e);
379         } finally {
380             db.release(conn);
381         }
382     }
383
384     @SuppressWarnings("resource")
385     int[] process(File file) {
386         int ok = 0;
387         int total = 0;
388         try {
389             Connection conn = db.getConnection();
390             PreparedStatement ps = conn.prepareStatement(INSERT_SQL);
391             Reader reader = file.getPath().endsWith(".gz")
392                 ? new InputStreamReader(new GZIPInputStream(new FileInputStream(file)))
393                 : new FileReader(file);
394             try (LineNumberReader in = new LineNumberReader(reader)) {
395                 String line;
396                 while ((line = in.readLine()) != null) {
397                     try {
398                         for (Loadable rec : buildRecords(line)) {
399                             rec.load(ps);
400                             if (rec instanceof LogRecord) {
401                                 LogRecord lr = ((LogRecord) rec);
402                                 if (!seqSet.get(lr.getRecordId())) {
403                                     ps.executeUpdate();
404                                     seqSet.set(lr.getRecordId());
405                                 } else {
406                                     logger.debug("Duplicate record ignored: " + lr.getRecordId());
407                                 }
408                             } else {
409                                 if (++nextId > setEnd) {
410                                     nextId = setStart;
411                                 }
412                                 ps.setLong(18, nextId);
413                                 ps.executeUpdate();
414                                 seqSet.set(nextId);
415                             }
416                             ps.clearParameters();
417                             ok++;
418                         }
419                     } catch (SQLException e) {
420                         logger.warn("PROV8003 Invalid value in record: " + line, e);
421                     } catch (NumberFormatException e) {
422                         logger.warn("PROV8004 Invalid number in record: " + line, e);
423                     } catch (ParseException e) {
424                         logger.warn("PROV8005 Invalid date in record: " + line, e);
425                     } catch (Exception e) {
426                         logger.warn("PROV8006 Invalid pattern in record: " + line, e);
427                     }
428                     total++;
429                 }
430             }
431             ps.close();
432             db.release(conn);
433         } catch (SQLException | IOException e) {
434             logger.warn("PROV8007 Exception reading " + file + ": " + e);
435         }
436         return new int[]{ok, total};
437     }
438
439     Loadable[] buildRecords(String line) throws ParseException {
440         String[] pp = line.split("\\|");
441         if (pp != null && pp.length >= 7) {
442             String rtype = pp[1].toUpperCase();
443             if ("PUB".equals(rtype) && pp.length == 11) {
444                 // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status
445                 return new Loadable[]{new PublishRecord(pp)};
446             }
447             if ("DEL".equals(rtype) && pp.length == 12) {
448                 // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid
449                 String[] subs = pp[4].split("\\s+");
450                 if (subs != null) {
451                     Loadable[] rv = new Loadable[subs.length];
452                     for (int i = 0; i < subs.length; i++) {
453                         // create a new record for each individual sub
454                         pp[4] = subs[i];
455                         rv[i] = new DeliveryRecord(pp);
456                     }
457                     return rv;
458                 }
459             }
460             if ("EXP".equals(rtype) && pp.length == 11) {
461                 // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts
462                 ExpiryRecord expiryRecord = new ExpiryRecord(pp);
463                 if ("other".equals(expiryRecord.getReason())) {
464                     logger.info("Invalid reason '" + pp[9] + "' changed to 'other' for record: "
465                                         + expiryRecord.getPublishId());
466                 }
467                 return new Loadable[]{expiryRecord};
468             }
469             if ("PBF".equals(rtype) && pp.length == 12) {
470                 // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error
471                 return new Loadable[]{new PubFailRecord(pp)};
472             }
473             if ("DLX".equals(rtype) && pp.length == 7) {
474                 // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent
475                 return new Loadable[]{new DeliveryExtraRecord(pp)};
476             }
477             if ("LOG".equals(rtype) && (pp.length == 19 || pp.length == 20)) {
478                 // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|
479                 // feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id
480                 return new Loadable[]{new LogRecord(pp)};
481             }
482         }
483         logger.warn("PROV8002 bad record: " + line);
484         return new Loadable[0];
485     }
486
487     /**
488      * The LogfileLoader can be run stand-alone by invoking the main() method of this class.
489      *
490      * @param str ignored
491      */
492     public static void main(String[] str) throws InterruptedException {
493         LogfileLoader.getLoader();
494         Thread.sleep(200000L);
495     }
496 }